jbossjboss-4.2.xjbossmq

JBossMQ message redelivery + DLQ


I'm trying some scenarios with JMS and JBoss 4.2.2 and I have few problems with it.

I have a Queue

<mbean code="org.jboss.mq.server.jmx.Queue" name="jboss.mq.destination:service=Queue,name=notificationQueue">
  <attribute name="JNDIName">jms.queue.testQueue</attribute>
  <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends>
  <depends optional-attribute-name="SecurityManager">jboss.mq:service=SecurityManager</depends>
  <attribute name="SecurityConf">
    <security>
      <role name="testUser" read="true" write="true" />
    </security>
  </attribute>
</mbean>

and

<invoker-proxy-binding>
  <name>message-driven-bean</name>
  <invoker-mbean>default</invoker-mbean>
  <proxy-factory>org.jboss.ejb.plugins.jms.JMSContainerInvoker</proxy-factory>
  <proxy-factory-config>
    <JMSProviderAdapterJNDI>DefaultJMSProvider</JMSProviderAdapterJNDI>
    <ServerSessionPoolFactoryJNDI>StdJMSPool</ServerSessionPoolFactoryJNDI>
    <CreateJBossMQDestination>true</CreateJBossMQDestination>
    <MinimumSize>1</MinimumSize>
    <MaximumSize>15</MaximumSize>
    <MaxMessages>16</MaxMessages>
    <MDBConfig>
      <ReconnectIntervalSec>10</ReconnectIntervalSec>
      <DLQConfig>
        <DestinationQueue>queue/DLQ</DestinationQueue>
        <MaxTimesRedelivered>3</MaxTimesRedelivered>
        <TimeToLive>0</TimeToLive>
        <DLQUser>jbossmquser</DLQUser>
        <DLQPassword>letmein</DLQPassword>
      </DLQConfig>
    </MDBConfig>
  </proxy-factory-config>
</invoker-proxy-binding>

To test redelivery I wrote MessageListener

import java.util.*;
import javax.jms.*;
import javax.naming.*;

public class NotifyQueueMessageListener {

    public static void main(String[] args) throws NamingException, JMSException {
        Hashtable<String, String> contextProperties = new Hashtable<String, String>();
        contextProperties.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
        contextProperties.put(Context.PROVIDER_URL, "jnp://localhost:7099");

        InitialContext initContext = new InitialContext(contextProperties);

        Queue queue = (Queue) initContext.lookup("jms.queue.testQueue");
        QueueConnection queueConnection = null;
        try {
            QueueConnectionFactory connFactory = (QueueConnectionFactory) initContext.lookup("ConnectionFactory");
            queueConnection = connFactory.createQueueConnection("jbossmquser", "letmein");
            Session queueSession = queueConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            queueConnection.setExceptionListener(new MyExceptionListener());
            MessageConsumer consumer = queueSession.createConsumer(queue);
            MyMessageListener messageListener = new MyMessageListener();
            consumer.setMessageListener(messageListener);
            queueConnection.start();

            Object o = new Object();
            synchronized (o) {
                o.wait();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            System.out.println("closing connection");
            if (queueConnection != null) {
                queueConnection.close();
            }
        }
    }

    static class MyMessageListener implements MessageListener {

        @Override
        public void onMessage(Message message) {
            if (message instanceof ObjectMessage) {
                ObjectMessage om = (ObjectMessage) message;
                try {
                    System.out.printf("MyMessageListener.onMessage( %s ), %s\n\n", om, om.getObject());
                    boolean throwException = om.getBooleanProperty("throw");
                    if (throwException) {
                        System.out.println("throwing exception");
                        throw new NullPointerException("just for testing");
                    }
                    message.acknowledge();
                } catch (JMSException jmse) {
                    jmse.printStackTrace();
                }
            }
        }

    }

    static class MyExceptionListener implements ExceptionListener {

        @Override
        public void onException(JMSException jmse) {
            jmse.printStackTrace();
        }

    }

}

and MessageSender

import java.text.*;
import java.util.*;
import javax.jms.*;
import javax.naming.*;

public class MessageSender {

    public static void main(String[] args) throws NamingException, JMSException {
        Hashtable<String, String> contextProperties = new Hashtable<String, String>();
        contextProperties.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
        contextProperties.put(Context.PROVIDER_URL, "jnp://localhost:7099");

        InitialContext initContext = new InitialContext(contextProperties);

        Queue queue = (Queue) initContext.lookup("notificationQueue");

        QueueConnection queueConnection = null;
        try {
            QueueConnectionFactory connFactory = (QueueConnectionFactory) initContext.lookup("ConnectionFactory");
            queueConnection = connFactory.createQueueConnection("jbossmquser", "letmein");
            // QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            // QueueSession queueSession = queueConnection.createQueueSession(true, Session.SESSION_TRANSACTED);
            QueueSession queueSession = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
            // QueueSession queueSession = queueConnection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);

            QueueSender sender = queueSession.createSender(queue);
            ObjectMessage message = queueSession.createObjectMessage();
            message.setBooleanProperty("throw", true); // to throw exception in listener
            message.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
            message.setIntProperty("JMS_JBOSS_REDELIVERY_LIMIT", 3);

            sender.send(message);
        } finally {
            System.out.println("closing connection");
            if (queueConnection != null) {
                queueConnection.close();
            }
        }
    }
}

Expected behavior

Because I'm throwing Exception in onMessage() I expect that message will tried again several times (<MaxTimesRedelivered>3</MaxTimesRedelivered>) and after that it will be moved to DLQ, but it's not.

What I tried

I tried all acknowledge modes (AUTO, CLIENT, DUPS_OK) together with commiting, acknowledging but nothing worked, even message wasn't sent again.

I have no idea what's wrong. There is nothing relevant in JBoss logs.

When I try to stop and run again MesageListener I'm getting:

MyMessageListener.onMessage( org.jboss.mq.SpyObjectMessage {
Header { 
   jmsDestination  : QUEUE.notificationQueue
   jmsDeliveryMode : 2
   jmsExpiration   : 0
   jmsPriority     : 4
   jmsMessageID    : ID:13-13577584629501
   jmsTimeStamp    : 1357758462950
   jmsCorrelationID: 20130109200742
   jmsReplyTo      : null
   jmsType         : null
   jmsRedelivered  : true
   jmsProperties   : {JMSXDeliveryCount=7, throw=true, JMS_JBOSS_REDELIVERY_LIMIT=3, JMS_JBOSS_REDELIVERY_COUNT=6}
   jmsPropReadWrite: false
   msgReadOnly     : true
   producerClientId: ID:13
}
} ), my message (2013-01-09 20:07:42)

MyMessageListener.onMessage( org.jboss.mq.SpyObjectMessage {
Header { 
   jmsDestination  : QUEUE.notificationQueue
   jmsDeliveryMode : 2
   jmsExpiration   : 0
   jmsPriority     : 4
   jmsMessageID    : ID:15-13577584942741
   jmsTimeStamp    : 1357758494274
   jmsCorrelationID: 20130109200814
   jmsReplyTo      : null
   jmsType         : null
   jmsRedelivered  : true
   jmsProperties   : {JMSXDeliveryCount=6, throw=true, JMS_JBOSS_REDELIVERY_LIMIT=3, JMS_JBOSS_REDELIVERY_COUNT=5}
   jmsPropReadWrite: false
   msgReadOnly     : true
   producerClientId: ID:15
}
} ), my message (2013-01-09 20:08:14)

MyMessageListener.onMessage( org.jboss.mq.SpyObjectMessage {
Header { 
   jmsDestination  : QUEUE.notificationQueue
   jmsDeliveryMode : 2
   jmsExpiration   : 0
   jmsPriority     : 4
   jmsMessageID    : ID:20-13577586971991
   jmsTimeStamp    : 1357758697199
   jmsCorrelationID: 20130109201137
   jmsReplyTo      : null
   jmsType         : null
   jmsRedelivered  : true
   jmsProperties   : {JMSXDeliveryCount=2, throw=true, JMS_JBOSS_REDELIVERY_LIMIT=3, JMS_JBOSS_REDELIVERY_COUNT=1}
   jmsPropReadWrite: false
   msgReadOnly     : true
   producerClientId: ID:20
}
} ), my message (2013-01-09 20:11:37)

MyMessageListener.onMessage( org.jboss.mq.SpyObjectMessage {
Header { 
   jmsDestination  : QUEUE.notificationQueue
   jmsDeliveryMode : 2
   jmsExpiration   : 0
   jmsPriority     : 4
   jmsMessageID    : ID:21-13577587683201
   jmsTimeStamp    : 1357758768320
   jmsCorrelationID: 20130109201248
   jmsReplyTo      : null
   jmsType         : null
   jmsRedelivered  : true
   jmsProperties   : {JMSXDeliveryCount=2, throw=true, JMS_JBOSS_REDELIVERY_LIMIT=3, JMS_JBOSS_REDELIVERY_COUNT=1}
   jmsPropReadWrite: false
   msgReadOnly     : true
   producerClientId: ID:21
}
} ), my message (2013-01-09 20:12:48)

as you can see I tried also JMS_JBOSS_REDELIVERY_LIMIT.

Any idea?


Solution

  • I found very helpful post

    https://community.jboss.org/wiki/ThrowingExceptionsFromAnMDB

    that reads:

    What type of Exceptions should an MDB throw?

    The quick answer is none.

    When I used transaction and createQueueSession(true, Session.SESSION_TRANSACTED) it worked just fine (redelivery and also DLQ).