I'm facing a strange issue with ActiveMQ Artemis 2.32.0. I have 2 consumers which are running on two separate threads. The producer publishes a message to a JMS topic. Sometimes one of the two consumers seems to be stuck because there is no message sent to that consumer. However, this is not happening every time. Sometimes I can see both the consumers received the message and the flow ended as expected. Below are my consumers (Tommy
, Harry
) and producer (WeatherChannel
).
Tommy.java
:
public class Tommy implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(Tommy.class);
public void receiveMessage() throws NamingException {
// Create a new initial context, which loads from jndi.properties file
Context context = new InitialContext();
// Lookup an existing Destination which is a topic in our example
Topic topic = (Topic)context.lookup("jms/test/topic");
//Object in a try-with-resources block the close method will be called automatically at the end of the block.
try(ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
JMSContext jmsContext = connectionFactory.createContext()) {
//Create consumer and receive String message on the fly (i.e. without need to type caste to Message etc.)
String messageReceived = jmsContext.createConsumer(topic).receiveBody(String.class);
logger.info("Message received by Tommy >>> {}", messageReceived);
}
}
// public static void main(String[] args) throws NamingException, InterruptedException {
// receiveMessage();
// }
@Override
public void run() {
try {
receiveMessage();
} catch (NamingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Harry.java
:
public class Harry implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(Harry.class);
public static void receiveMessage() throws NamingException {
// Create a new initial context, which loads from jndi.properties file
Context context = new InitialContext();
// Lookup an existing Destination which is a topic in our example
Topic topic = (Topic)context.lookup("jms/test/topic");
//Object in a try-with-resources block the close method will be called automatically at the end of the block.
try(ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
JMSContext jmsContext = connectionFactory.createContext()) {
//Create consumer and receive String message on the fly (i.e. without need to type caste to Message etc.)
String messageReceived = jmsContext.createConsumer(topic).receiveBody(String.class);
logger.info("Message received by Harry >>> {}", messageReceived);
}
}
// public static void main(String[] args) throws NamingException, InterruptedException {
// receiveMessage();
// }
@Override
public void run() {
try {
receiveMessage();
} catch (NamingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
WeatherChannel.java
:
public class WeatherChannel {
private static final Logger logger = LoggerFactory.getLogger(WeatherChannel.class);
public void broadcastMessage() throws NamingException, JMSException, InterruptedException {
// Create a new initial context, which loads from jndi.properties file
Context context = new InitialContext();
// Lookup an existing Destination which is a topic in our example
Topic topic = (Topic)context.lookup("jms/test/topic");
//Object in a try-with-resources block the close method will be called automatically at the end of the block.
try(ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
JMSContext jmsContext = connectionFactory.createContext()) {
//Create producer and send message on the fly
jmsContext.createProducer().send(topic, "Today's weather at Kolkata is pleasant with max temp 27 C");
logger.info("Message sent successfully by producer");
}
}
}
This is how I invoke the consumers and producer from a main
method:
new Thread(new Tommy()).start();
new Thread(new Harry()).start();
new WeatherChannel().broadcastMessage();
Finally, I triggered both the consumers from their individual main methods and got rid of the threads. This time I see I don't see the issue and both the consumers are receiving the message successfully. Can someone point me out where I went wrong?
This is most likely a result of the consumer threads not being fully up and running by the time the producer thread send the message. A JMS Topic doesn't retain messages for consumers that are not online at the time they are sent unless there was an existing durable Topic subscription.
Sine a Topic is a broadcast mechanism and not a Queue you need to ensure that any consumer you want to receive a given message is online prior to the send. In you simple example this could be accomplished by passing a latch or other waitable resource to objects when they are created and wait on that latch before sending. The thread could preform a latch countdown prior to calling receive but after the consumer was created.