I have a requirement where I have to read a message from an IBM MQ queue and then publish it to 3 different queues after performing different transformations for each.
Is it possible to handle this in a transactional manner using Spring integration?
By transactional, what I mean is the message should be published to all 3 outbound queues. In the case of not being able to publish to one or more queues, the message should not be published (committed) to any queue, and instead should be rolled back to the inbound queue.
I managed to get this working as below.
I set the sessionTransacted attribute of the ListenerContainer to be "True" when configuring the InboundChannelAdapter.
I then used a publishSubscribeChannel to route the message to 3 separate handlers which perform the 3 different transformations.
Each of those subflows are terminated with a JMS OutboundAdapter which publishes the message to a respective queue.
As the OutboundAdapters by default use an existing transaction if it is present, the transaction created when initially receiving the message is used which ensures that until all the outbound adapters commit, the transaction is not ended.
I tested this by throwing exceptions randomly in each transformer after the message had been published in other subflows, and confirmed that the message is not written to any outbound queue in those scenarios - and the message is returned to the inbound queue. The same was observed when the application was crashed while the message was being processed.
@Bean
public IntegrationFlow jmsMessageDrivenRedeliveryFlow(
ConnectionFactory connectionFactory,
@Value("${test.input.queue}") String inputQueue,
@Value("${test.output.queue.1}") String outputQueue1,
@Value("${test.output.queue.2}") String outputQueue2,
@Value("${test.output.queue.3}") String outputQueue3) {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.configureListenerContainer(c -> c.sessionTransacted(true))
.destination(inputQueue))
.publishSubscribeChannel(pubSub -> pubSub
.subscribe(sf -> sf
.handle(splitterServices, "transform1")
.handle(Jms.outboundAdapter(connectionFactory)
.destination(outputQueue1)))
.subscribe(sf -> sf
.handle(splitterServices, "transform2")
.handle(Jms.outboundAdapter(connectionFactory)
.destination(outputQueue2)))
.subscribe(sf -> sf
.handle(splitterServices, "transform3")
.handle(Jms.outboundAdapter(connectionFactory)
.destination(outputQueue3))))
.get();
}