dapr

Dapr PubSub streaming subscription without CloudEvent


In the dapr docs it is only stated how to include the required spec.metadata.isRawPayload in programmatic and declarative subscriptions.

However, if you need to enable dapr at runtime, you need to use streaming subscriptions (there is this slightly outdated Java example). There doesn't exist a way to pass the client the required metadata field in daprs Java SDK as far as I could determine.


Solution

  • Solution if deployed in k8s

    As long as the pods restart policy from Kubernetes ensures that crashed pods are restarted (and having dapr unavailable for a few seconds does no harm), you can

    1. Use a programmatic subscription and return an empty array at the application startup

    2. As soon as you want to enable dapr PubSub

      1. Change the endpoint to now return your desired PubSub topic including all desired metadata fields

      2. Call the shutdown api

    This forces dapr to reload and call the "programmatic subscription" endpoint again. This should also work if dapr is run via the cmd if you call a script from your application that stops dapr via the http-endpoint and restarts it via the cmd-line

    The endpoint for managing the dapr subscription

    @Get(value = "/dapr/subscribe")
    public DaprSubscribe[] getDaprSubscriptions() {
        return switch (mode) {
            case ACTIVE ->
                new DaprSubscribe[] {
                    new DaprSubscribe(
                        "<pubsub>",
                        "<topic>",
                        new DaprSubscribe.Routes("<http-processing-endpoint-of-app>"),
                        new DaprSubscribe.Metadata("true")
                    ),
                };
            case INACTIVE -> new DaprSubscribe[] {};
        };
    }
    
    @Serdeable
    public record DaprSubscribe(String pubsubname, String topic, Routes routes, Metadata metadata) {
        @Serdeable
        record Routes(@JsonProperty("default") String defaultValue) {}
    
        @Serdeable
        record Metadata(String rawPayload) {}
    }