wso2activemq-classicesbttlamq

WSO2 ESB + activeMQ


I'm new in wso2 esb and jms. I send some messages from soapUI to wso2 esb. In my wso sequence a processed message goes to the jms. Is there a possibility to set "time to live" of this message from wso2 esb? Or some other ways?

In AMQ I added this:

<policyEntry queue="myQueue">
  <deadLetterStrategy>
    <individualDeadLetterStrategy
      queuePrefix="DLQ." useQueueForQueueMessages="true" />
  </deadLetterStrategy>

Something like <property name="JMSExpiration" value="today+hour_long_value" scope="transport" type="STRING"></property> in sequence have no effect.


Solution

  • The only workable way, I found out, is to create own Mediator, which will set the time-to-live of a message and send it to the queue. The queue name is preset in a sequence, then called mediator:

    <property xmlns="http://ws.apache.org/ns/synapse" name="qname" value="your_queue_name" scope="default" type="STRING"></property>
    
    <class xmlns="http://ws.apache.org/ns/synapse" name="com.example.JMSMessageTimeToLiveMediator"></class>
    

    A mediator class:

    public class JMSMessageTimeToLiveMediator extends AbstractMediator implements
        ManagedLifecycle {
    
    private static String CON_FACTORY_NAME = "QueueConnectionFactory";
    private static String DEF_PROP_QNAME = "qname";
    private static long TIME_TO_LIVE = 60000;
    
    private static QueueConnectionFactory cf;
    
    public boolean mediate(MessageContext context) {
        Connection connection = null;
        Session session = null;
        try {
            connection = cf.createQueueConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            String queueName = (String) context.getProperty(DEF_PROP_QNAME);
            Destination destination = session.createQueue(queueName);
    
            MessageProducer producer = session.createProducer(destination);
            producer.setTimeToLive(TIME_TO_LIVE);
    
            TextMessage message = session.createTextMessage(context
                    .getEnvelope().toString());
    
            producer.send(message);
        } catch (JMSException e) {
            log.error("ProduceJMS ERROR: " + e.getClass() + "   "
                    + e.getMessage());
        } catch (Exception e) {
            log.error("ProduceJMS ERROR: " + e.getClass() + "   "
                    + e.getMessage());
        } finally {
            try {
                session.close();
                connection.close();
            } catch (JMSException e) {
                log.error("ProduceJMS ERROR: " + e.getMessage());
            }
        }
    
        return true;
    }
    
    public void init(SynapseEnvironment emvironment) {
        Hashtable<String, Object> environment = new Hashtable<String, Object>();
        environment.put("java.naming.factory.initial",
                "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
        log.debug("ProduceJMS INIT");
        try {
            InitialContext ic = new InitialContext(environment);
            cf = (QueueConnectionFactory) ic.lookup(CON_FACTORY_NAME);
        } catch (NamingException e) {
            log.error("ProduceJMS INIT ERROR: " + e.getMessage());
        }
    }
    
    public void destroy() {
    }
    

    }