I am using the @RabbitListener annotation and would like to enable transactional behavior during consumption. Per the docs, this is achieved by configuring container.setChannelTransacted(true) in your message listener container bean and providing it a transaction manager.
For asynchronous use cases with SimpleMessageListenerContainer, if an external transaction is needed, it has to be requested by the container when it sets up the listener. To signal that an external transaction is required, the user provides an implementation of PlatformTransactionManager to the container when it is configured. The following example shows how to do so:
public class ExampleExternalTransactionAmqpConfiguration {
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setTransactionManager(transactionManager());
container.setChannelTransacted(true);
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}
}
However, the docs do not give an example of how to add this setting when using the @RabbitListener annotation, which creates its own listener container under the hood - again per the docs:
The annotated endpoint infrastructure creates a message listener container behind the scenes for each annotated method, by using a RabbitListenerContainerFactory.
On top of this "channelTransacted" setting, I see there is actually a whole host of options for listener containers. So, how do we configure these options when using the @RabbitListener annotation?
The @RabbitListener
is used by the RabbitListenerContainerFactory
to create that MessageListenerContainer
instance.
See @RabbitListener
Javadocs:
/**
* The bean name of the
* {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory} to
* use to create the message listener container responsible to serve this endpoint.
* <p>
* If not specified, the default container factory is used, if any. If a SpEL
* expression is provided ({@code #{...}}), the expression can either evaluate to a
* container factory instance or a bean name.
* @return the
* {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory}
* bean name.
*/
String containerFactory() default "";
Then see AbstractRabbitListenerContainerFactory
:
/**
* @param transactionManager the {@link PlatformTransactionManager} to use.
* @see AbstractMessageListenerContainer#setTransactionManager
*/
public void setTransactionManager(PlatformTransactionManager transactionManager) {
More in docs: https://docs.spring.io/spring-amqp/reference/amqp/receiving-messages/async-annotation-driven/enable.html