jmsadvanced-queuing

Handling JMS transactions and redelivery with multi-threaded listeners


I am using JMS to process messages in a Java 1.8 SE environment. The messages originate from an Oracle Advanced Queue. Because it may take a while to process a message, I decided to have a pool of 5 worker threads (the MessageHandler objects), so that more than one thread could be processing messages at once. I would like to have guaranteed delivery with no duplicate message delivery.

I use

queueConnection.createQueueSession(true, Session.SESSION_TRANSACTED);

to create the QueueSession. I use the code below to process incoming messages. Basically, onMessage spawns a thread that processes a message.

public class JmsQueueListener implements MessageListener
{
    /** A pool of worker threads for handling requests. */
    private final ExecutorService pool;

    OracleJmsQueue queue;

    public void onMessage(Message msg)
    {
        pool.execute(new MessageHandler(msg));
        // can't commit here - the thread may still be processing
    }

    /**
     * This class provides a "worker thread" for processing a message
     * from the queue.
     */
    private class MessageHandler implements Runnable {

        /**
         * The message to process
         */
        Message message;

        /**
         * The constructor stores the passed in message as a field
         */
        MessageHandler(Message message) {
            this.message = message;
        }

        /**
         * Processes the message provided to the constructor by
         * calling the appropriate business logic.
         */
        public void run() {
            QueueSession queueSession = queue.getQueueSession();
            try {
                String result = requestManager.processMessage(message);

                if (result != null) {
                    queueSession.commit();
                }
                else {
                    queueSession.rollback();
                }
            }
            catch (Exception ex) {
                try {
                    queueSession.rollback();
                }
                catch (JMSException e) {
                }
            }
        }
    }   //  class MessageHandler

My problem is that I don't know how to indicate to the originating queue whether or not processing has completed successfully. I can't commit at the end of onMessage, because the thread may not have completed processing. I don't think that where I currently have commits and rollbacks is any good either. For example, if the 5 worker threads are in various states of completion, what is the state of the queue session being committed?

I think I must be missing some fundamental concept on how to handle JMS in a multi-threaded fashion. Any help would be much appreciated.


Solution

  • You are using a asynchronous message processing so, unless you implement a way ensure that each message processing is done chronologically, you will end up the a scenario where an older message processing finishes after a recent message processing. So why using a messaging service?

    A simple solution to your problem is commit at end of the onMessage method, and in the body of your messageHandler re-enqueue the message in case of an error. However this solution can have a problem, if the re-enqueue itself fails.