spring-kafkaspring-cloud-stream

How can I conditionally deserialize a Kafka message based on some condition?


I have avro definitions for a number of events, one of which is a record which contains a union type of a number of other events. Using a simple example to illustrate:

I then have a topic, let's call it "person.domain-events", and have registered the schema registry correctly so that it only accepts PersonDomainEvent on that topic.

I am able to produce events just fine on to the topic using something like this:

    public void send(PersonDomainEvent event, String personId) {
        Message<PersonDomainEvent> message = MessageBuilder.withPayload(event)
                .setHeader(KEY, personId)
                .setHeader("type", "org.something.x.y.z.PersonCreatedEvent")
                .build();
        streamBridge.send("person.domain-events", message);
    }

On the consumer side though - what would be the best way to write it so that it only invokes the logic on specific payload types? For example suppose I wanted a consumer which only cared about PersonCreatedEvent and PersonMaritalStatusChangedEvent - and for all other events it would just auto-increment the offset?

For the situation where the producer decides to put a new event on the topic and the consumer has not updated their avro pojos - the consumer will be left unable to deserialize this new type. Is there some sort of message filter/interceptor in Spring which I can wire up for each binding - which is able to deserialize the key and based on the header decide whether to deserialize the payload or just increment the offset?


Solution

  • The event routing feature in Spring Cloud Stream may be a potential solution in this situation. See the reference docs: https://docs.spring.io/spring-cloud-stream/reference/4.1-SNAPSHOT/spring-cloud-stream/event-routing.html

    Basically, you enable function routing via the property spring.cloud.stream.function.routing.enabled=true.

    When you do this, it creates a binding with the name functionRouter-in-0, and you produce messages to the topic that is bound to this binding (By default Spring Cloud Stream expects the topic name also to be functionRouter-in-0, but you can change that via the destination property on the binding.)

    Any messages published via this binding will be routed through a special RoutingFunction where it uses the below logic for routing:

    /*
         * - Check if `this.routingCallback` is present and if it is use it (only for Message input)
         * If NOT
         * - Check if spring.cloud.function.definition is set in header and if it is use it.(only for Message input)
         * If NOT
         * - Check if spring.cloud.function.routing-expression is set in header and if it is set use it (only for Message input)
         * If NOT
         * - Check `spring.cloud.function.definition` is set in FunctionProperties and if it is use it (Message and Publisher)
         * If NOT
         * - Check `spring.cloud.function.routing-expression` is set in FunctionProperties and if it is use it (Message and Publisher)
         * If NOT
         * - Fail
         */
    

    MessageRoutingCallback will give you advanced programmatic abilities to route the messages to a named function.

    See it's contract in the javadocs:

        /**
         * Computes and returns the instance of {@link FunctionRoutingResult} which encapsulates,
         * at the very minimum, function definition.
         * <br><br>
         * Providing such message is primarily an optimization feature. It could be useful for cases
         * where routing procedure is complex and results in, let's say, conversion of the payload to
         * the target type, which would effectively be thrown away if the ability to modify the target
         * message for downstream use didn't exist, resulting in repeated transformation, type conversion etc.
         *
         * @param message input message
         * @return instance of {@link FunctionRoutingResult} containing the result of the routing computation
         */
        default String routingResult(Message<?> message) {
            return (String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION);
        }
    

    Here is a sample that demonstrates this feature (note: this sample is based on older versions, so some methods are probably not the same, but the general idea still holds): https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/routing-samples/message-routing-callback