I have a setup where I have to read a message from a queue in an ActiveMQ broker. Once the message is read I have to do a long-running operation on the message.
Due to this long-running operation on the message I want to acknowledge the message as soon as possible so the resources on the broker are released. The plan would be to execute the following steps once a message is received:
I've read about JMS and the different acknowledge modes, so before even trying to do that I decided to set up an application where I could try the different modes to understand how they are processes, unfortunately I cannot seem to get my desired output.
Following the information in this answer https://stackoverflow.com/a/10188078 as well as https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html I thought that by using AUTO_ACKNOWLEDGE the message would be acknowledged before my listener is even called, but if I throw an exception in the listener the message is redelivered.
I've tried both with and without setting the setSessionTransacted to true, but in both cases I get the same output. The message is redelivered when an exception is thrown in the JmsListener.
Configuration of JMS
@Bean
public ConnectionFactory connectionFactory() {
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(jmsConfig.getBrokerUrl());
return connectionFactory;
}
@Bean
public JmsTemplate jmstemplate(){
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(connectionFactory());
//jmsTemplate.setSessionTransacted(true);
jmsTemplate.setDefaultDestinationName( jmsConfig.getQueueIn() );
return jmsTemplate;
}
@Bean
public JmsListenerContainerFactory jmsListenerContainerFactoryxxxx(
ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
//factory.setConcurrency("1");
factory.setSessionTransacted(true);
configurer.configure(factory, connectionFactory);
return factory;
}
JmsListener
@JmsListener(destination = "B1Q1", containerFactory = "jmsListenerContainerFactoryxxxx")
public void receiveMessage(Message message) {
try {
TextMessage m = (TextMessage) message;
String messageText = m.getText();
int retryNum = message.getIntProperty("JMSXDeliveryCount");
long s = message.getLongProperty("JMSTimestamp");
Date d = new Date( s );
String dbText = String.format("Retry %d. Message: %s", retryNum, messageText);
if ( messageText.toLowerCase().contains("exception") ) {
logger.info("Creating exception for retry: {}", retryNum);
throw new RuntimeException();
}
} catch (JMSException e) {
logger.error("Exception!!", e);
}
}
How should I change the code so that the message is not redelivered when an exception is thrown?
Going back to my application where I would be inserting the message into the DB. How could I acknowledge the message in by JmsListener after the message is inserted in the DB but before executing the long-running task?
In order to be able to use AUTO_ACKNOWLEDGE
or CLIENT_ACKNOWLEDGE
I had to call factory.setSessionTransacted(false)
after configuring the factory.
Calling configurer.configure(factory, connectionFactory)
overrides the value of sessionTransacted
, in my case it was setting it to true
which rendered AUTO_ACKNOWLEDGE
or CLIENT_ACKNOWLEDGE
ineffective. Here's the relevant code of DefaultJmsListenerContainerFactoryConfigurer.java
:
public void configure(DefaultJmsListenerContainerFactory factory, ConnectionFactory connectionFactory) {
...
...
if (this.transactionManager != null) {
factory.setTransactionManager(this.transactionManager);
} else {
factory.setSessionTransacted(true);
}
...
...