We are using Spring Cloud Stream to listen to multiple queues.
spring:
cloud:
function:
definition: functionRouter;supplier
stream:
solace:
bindings:
functionRouter-in-0:
consumer:
...
bindings:
functionRouter-in-0:
destination: ${SOLACE_QUEUE},${SOLACE_DMQ}
As we expect multiple message formats on the queues, we use functionRouter and MessageRoutingCallback
to find the correct function that handles the message, so we can leverage the automatic deserialization of JSON messages. Simplified example:
class MessageRouter : MessageRoutingCallback {
override fun functionDefinition(message: Message<*>): String {
val topic = message.headers[SolaceHeaders.DESTINATION]
...
val isDmqEligible = message.headers[SolaceHeaders.DMQ_ELIGIBLE] as Boolean
if (!isDmqEligible) {
return "receiveFromDMQ"
}
return "receiveAndSend"
}
}
As both queues use the same binding, we cannot set different consumer properties like backOffMaxInterval
if we use only one functionRouter.
Is there any solution that uses a similar approach (routers that select the handler function based on message header) but support multiple routers as entry points? Something like this:
spring:
cloud:
function:
definition: functionRouter1;functionRouter2;supplier
stream:
solace:
bindings:
functionRouter1-in-0:
consumer:
...
functionRouter2-in-0:
consumer:
...
bindings:
functionRouter1-in-0:
destination: ${SOLACE_QUEUE}
functionRouter2-in-0:
destination: ${SOLACE_DMQ}
I'd be also open to any solutions (using spring cloud stream) where we can dynamically dispatch different message types from the same channel ${SOLACE_QUEUE}
and we can still leverage the automatic deserialization.
So, RoutingFunction was not really designed for this type of cases since, as you said, it's single binding and routing is happening to other functions by reference (no queue in between). That said, you can certainly do what you suggested by simply defining another router function bean with different name in your context:
@Bean
RoutingFunction functionRouter2(FunctionCatalog functionCatalog, FunctionProperties functionProperties,
BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) {
return new RoutingFunction(functionCatalog, functionProperties, new BeanFactoryResolver(beanFactory), routingCallback);
}