I am using an executor service to ensure that incoming jms messages are acknowledged after I write them to my database - (Using XA datasources and distributed transaction is not an option we have right now). To achieve this, my flow writes to the database when a message is received, and then uses an executorservice to start a new thread.
@Bean
public Function<Channels, MessageChannelSpec<?, ?>> jmsTxCommitingChannelSpec() {
final ExecutorService executorService = Executors.newCachedThreadPool();
return channels -> channels.executor(executorService);
}
@Bean
public Consumer<HeaderEnricherSpec> errorChannelSpec(MessageChannel genericExceptionChannel) {
return h -> h.header(MessageHeaders.ERROR_CHANNEL, genericExceptionChannel);
}
@Bean
public IntegrationFlow jmsMessageFlow(
@Qualifier("jmsConnectionFactory") ConnectionFactory connectionFactory,
Function<Channels, MessageChannelSpec<?, ?>> jmsTxCommitingChannelSpec)
{
return IntegrationFlow.from(
Jms.messageDrivenChannelAdapter(connectionFactory)
.destination("INCOMING_QUEUE")
.configureListenerContainer(
jmsListenerContainerSpec.andThen(spec -> spec.id("ListenerContainer")))
.errorChannel(genericExceptionChannel)
.outputChannel("messageHandlingChannel"))
// save message in db
.handle(
(payload, headers) -> databaseService.save(payload),
spec -> spec.advice(messageRetryAdvice).id("persistClientMessage"))
// new thread so that the jms message is acknowledged
.channel(jmsTxCommitingChannelSpec)
.enrichHeaders(errorChannelSpec)
.handle(
(payload, headers) -> messageParser.extractMessageMetadata(payload),
spec -> spec.id("extractMessageMetadata"))
.route(incomingMessageRouter)
.get();
}
I am not sure on how the close the executor service in this case? Or if it should even be closed?
Consider to use a ThreadPoolTaskExecutor
instead of that Executors.newCachedThreadPool()
and as a bean. It has a proper lifecycle management when Spring application is shouted down.
See more info in docs: https://docs.spring.io/spring-framework/reference/integration/scheduling.html#scheduling-task-executor