I have avro definitions for a number of events, one of which is a record which contains a union type of a number of other events. Using a simple example to illustrate:
I then have a topic, let's call it "person.domain-events", and have registered the schema registry correctly so that it only accepts PersonDomainEvent on that topic.
I am able to produce events just fine on to the topic using something like this:
public void send(PersonDomainEvent event, String personId) {
Message<PersonDomainEvent> message = MessageBuilder.withPayload(event)
.setHeader(KEY, personId)
.setHeader("type", "org.something.x.y.z.PersonCreatedEvent")
.build();
streamBridge.send("person.domain-events", message);
}
On the consumer side though - what would be the best way to write it so that it only invokes the logic on specific payload types? For example suppose I wanted a consumer which only cared about PersonCreatedEvent and PersonMaritalStatusChangedEvent - and for all other events it would just auto-increment the offset?
For the situation where the producer decides to put a new event on the topic and the consumer has not updated their avro pojos - the consumer will be left unable to deserialize this new type. Is there some sort of message filter/interceptor in Spring which I can wire up for each binding - which is able to deserialize the key and based on the header decide whether to deserialize the payload or just increment the offset?
The event routing feature in Spring Cloud Stream may be a potential solution in this situation. See the reference docs: https://docs.spring.io/spring-cloud-stream/reference/4.1-SNAPSHOT/spring-cloud-stream/event-routing.html
Basically, you enable function routing via the property spring.cloud.stream.function.routing.enabled=true
.
When you do this, it creates a binding with the name functionRouter-in-0
, and you produce messages to the topic that is bound to this binding (By default Spring Cloud Stream expects the topic name also to be functionRouter-in-0
, but you can change that via the destination
property on the binding.)
Any messages published via this binding will be routed through a special RoutingFunction
where it uses the below logic for routing:
/*
* - Check if `this.routingCallback` is present and if it is use it (only for Message input)
* If NOT
* - Check if spring.cloud.function.definition is set in header and if it is use it.(only for Message input)
* If NOT
* - Check if spring.cloud.function.routing-expression is set in header and if it is set use it (only for Message input)
* If NOT
* - Check `spring.cloud.function.definition` is set in FunctionProperties and if it is use it (Message and Publisher)
* If NOT
* - Check `spring.cloud.function.routing-expression` is set in FunctionProperties and if it is use it (Message and Publisher)
* If NOT
* - Fail
*/
MessageRoutingCallback
will give you advanced programmatic abilities to route the messages to a named function.
See it's contract in the javadocs:
/**
* Computes and returns the instance of {@link FunctionRoutingResult} which encapsulates,
* at the very minimum, function definition.
* <br><br>
* Providing such message is primarily an optimization feature. It could be useful for cases
* where routing procedure is complex and results in, let's say, conversion of the payload to
* the target type, which would effectively be thrown away if the ability to modify the target
* message for downstream use didn't exist, resulting in repeated transformation, type conversion etc.
*
* @param message input message
* @return instance of {@link FunctionRoutingResult} containing the result of the routing computation
*/
default String routingResult(Message<?> message) {
return (String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION);
}
Here is a sample that demonstrates this feature (note: this sample is based on older versions, so some methods are probably not the same, but the general idea still holds): https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/routing-samples/message-routing-callback