I have a class that implements JmsListenerContainerFactory
with the following parameters:
factory.setSessionTransacted(true);
factory.setConcurrency("1");
factory.setAutoStartup(true);
And another class of type AtomikosConnectionFactoryBean
with the following parameter:
atomikosfactory.setMaxPoolSize(2);
I also have two queues which are each being handled by one consumer:
@JmsListener(containerFactory = "jmsListenerContainerFactory", destination = "${queue-one}")
@Transactional
public <T> void handleOne(final Message<T> message)
{
final var payload = message.getPayload();
final var handler = getHandler(payload.getClass(););
handler.handleOne(payload);
}
@JmsListener(containerFactory = "jmsListenerContainerFactory", destination = "${queue-two}")
@Transactional
public <T> void handleTwo(final Message<T> message)
{
final var payload = message.getPayload();
final var handler = getHandler(payload.getClass(););
handler.handleTwo(payload);
}
Unfortunately, both handlers need to access the same entry in the database. Currently, this leads to unexpected behaviours as both are trying to access it at the same time. I am looking for a way to make sure they run sequentially instead of concurrently.
I've tried to make my handleOne
and handleTwo
methods call the same method which was tagged @Synchronized
to make sure they never run at the same time. This worked insofar that one of the transactions would roll back when trying to access the resource, but by that time some operations were already executed that could not be rolled back. This then lead to errors later on when the thread tried to run again after the resource was released.
The solutions ended up being that I could pass an object of ThreadPoolTaskExecuter
to the JmsListenerContainerFactory
with the following parameter settings:
executor.setCorePoolSize(1);
executor.setMaxPoolSize(1);
executor.setQueueCapacity(100);
executor.initialize();
This has the desired behaviour.