javajmsspring-jms

Getting two consumers in a JmsListenerContainerFactory to process messages sequentially


I have a class that implements JmsListenerContainerFactory with the following parameters:

factory.setSessionTransacted(true);
factory.setConcurrency("1");
factory.setAutoStartup(true);

And another class of type AtomikosConnectionFactoryBean with the following parameter:

atomikosfactory.setMaxPoolSize(2);

I also have two queues which are each being handled by one consumer:

@JmsListener(containerFactory = "jmsListenerContainerFactory", destination = "${queue-one}")
@Transactional
public <T> void handleOne(final Message<T> message)
{
   final var payload = message.getPayload();
   final var handler = getHandler(payload.getClass(););
   handler.handleOne(payload);
}

@JmsListener(containerFactory = "jmsListenerContainerFactory", destination = "${queue-two}")
@Transactional
public <T> void handleTwo(final Message<T> message)
{
   final var payload = message.getPayload();
   final var handler = getHandler(payload.getClass(););
   handler.handleTwo(payload);
}

Unfortunately, both handlers need to access the same entry in the database. Currently, this leads to unexpected behaviours as both are trying to access it at the same time. I am looking for a way to make sure they run sequentially instead of concurrently.

I've tried to make my handleOne and handleTwo methods call the same method which was tagged @Synchronized to make sure they never run at the same time. This worked insofar that one of the transactions would roll back when trying to access the resource, but by that time some operations were already executed that could not be rolled back. This then lead to errors later on when the thread tried to run again after the resource was released.


Solution

  • The solutions ended up being that I could pass an object of ThreadPoolTaskExecuter to the JmsListenerContainerFactory with the following parameter settings:

    executor.setCorePoolSize(1);
    executor.setMaxPoolSize(1);
    executor.setQueueCapacity(100);
    executor.initialize();
    

    This has the desired behaviour.