activemq-classictomee-7apache-tomeetomee-8

Apache TomEE external ActiveMQ resource not rolling back in distributed transaction


I am trying to achieve a distributed transaction in Apache TomEE. In words, the flow is:

Operations 1, 2, & 3 are all part of the same XA transaction controlled by TomEE. Therefore, under any circumstances they either all fail or all succeed.

tomee.xml

<?xml version="1.0" encoding="UTF-8"?>
<tomee>
     this resource adapter is just necessary to tell tomee to not start internal ActiveMq instance
    <Resource id="MyAdapter" type="ActiveMQResourceAdapter">
        BrokerXmlConfig
        ServerUrl tcp://fakehost:666 
    </Resource> 

     <Resource id="jms/MyIncomingConnFactory" type="javax.jms.ConnectionFactory" class-name="org.apache.activemq.ActiveMQXAConnectionFactory">
        BrokerURL tcp://externalhost:61616?jms.redeliveryPolicy.maximumRedeliveries=0 
    </Resource>

    <Resource id="jms/MyOutgoingConnFactory" type="javax.jms.ConnectionFactory" class-name="org.apache.activemq.ActiveMQXAConnectionFactory">
        BrokerURL tcp://externalhost:61616?jms.redeliveryPolicy.maximumRedeliveries=0
    </Resource>

    <Resource id="jms/MyOutgoingQueue" class-name="org.apache.activemq.command.ActiveMQQueue">
        PhysicalName MY_OUTGOING_QUEUE 
    </Resource>

    <Resource id="jms/MyIncomingQueue" class-name="org.apache.activemq.command.ActiveMQQueue">
        PhysicalName MY_INCOMING_QUEUE 
    </Resource>

    <Resource id="jdbc/myDBXAPooled" type="DataSource">
        XaDataSource myDBXA
        DataSourceCreator dbcp
        JtaManaged true
        UserName TestUser
        Password TestPassword
        MaxWait 2000
        ValidationQuery SELECT 1    
        MaxActive 15
    </Resource> 

    <Resource id="myDBXA" type="XADataSource" class-name="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource">
        Url jdbc:mysql://localhost:3306/test
        User TestUser
        Password TestPassword
    </Resource>
</tomee>

Springconfig.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:jee="http://www.springframework.org/schema/jee"
            xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.2.xsd">


    <!-- <jee:jndi-lookup jndi-name="myDBXAPooled" id="myDatasource" resource-ref="true" />  -->
    <jee:jndi-lookup jndi-name="jms/MyOutgoingConnFactory" id="myOutgoingConnFactory" resource-ref="true" />
    <jee:jndi-lookup jndi-name="jms/MyIncomingConnFactory" id="myIncomingConnFactory" resource-ref="true" />  
    <jee:jndi-lookup jndi-name="jms/MyOutgoingQueue" id="myOutgoingQueue" resource-ref="true" />
    <jee:jndi-lookup jndi-name="jms/MyIncomingQueue" id="myIncomingQueue" resource-ref="true" />
    <jee:jndi-lookup jndi-name="jdbc/myDBXAPooled" id="myDatasource" resource-ref="true" />

    <tx:jta-transaction-manager/>
    <!-- <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/> -->
    <!-- the previous two ways of getting the transactionManager seems equivalent and both get Geronimo -->


</beans>

SpringConfig.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:jee="http://www.springframework.org/schema/jee"
            xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.2.xsd">


    <bean id="messageListener" class="com.test.MyListener">
        <property name="connectionFactory" ref="myIncomingConnFactory" />
        <property name="destination" ref="myIncomingQueue" />
        <!-- <property name="sessionTransacted" value="true" /> -->
        <property name="concurrentConsumers" value="1" />
        <property name="maxConcurrentConsumers" value="6" />
        <property name="messageListener" ref="myMessageProcessor" />
        <property name="transactionManager" ref="transactionManager" />
        <property name="taskExecutor" ref="msgListenersTaskExecutor" />
    </bean>

    <bean id="myMessageProcessor" class="com.test.MyMessageReceiver">
        <property name="forwardConnectionFactory" ref="myOutgoingConnFactory" />
        <property name="forwardQueue" ref="myOutgoingQueue" />
        <property name="datasource" ref="myDatasource" />
    </bean>

    <bean id="msgListenersTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"/>



</beans>

MyMessageReceiver.java:

package com.test;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.sql.DataSource;

import org.apache.log4j.Logger;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

public class MyMessageReceiver implements MessageListener {

    static Logger log = Logger.getLogger(MyMessageReceiver.class);

    private ConnectionFactory forwardConnectionFactory;
    private Queue forwardQueue;
    private DataSource datasource;

    public void setForwardConnectionFactory(ConnectionFactory connFactory) {
        forwardConnectionFactory=connFactory;
    }
    public void setforwardQueue(Queue queue) {
        forwardQueue=queue;
    }
    public void setDatasource(DataSource ds) {
        datasource=ds;
    }

    @Override
    @Transactional(propagation=Propagation.REQUIRED)
    public void onMessage(Message message) {

        log.info("************************************");
        MyListener listener = (MyListener)SpringContext.getBean("messageListener");
        listener.printInfo();
        log.info("************************************");

        TextMessage msg = (TextMessage) message;
        String text = null;
        try {
            text = msg.getText();

            if (text != null) log.info("MESSAGE RECEIVED: "+ text);

            updateDB(text); // function call to update DB

            sendMsg(text);   // function call to publish messages to queue

           System.out.println("****************Rollback");
            // Throwing exception to rollback DB, Message should not be 
             // published and consumed message sent to a DLQ 
             //(Broker side DLQ configuration already done) 
        throw new RuntimeException();
            //if (text!=null && text.indexOf("rollback")!=-1) throw new RuntimeException("Message content includes the word rollback");

        } catch (Exception e) {
            log.error("Rolling back the entire XA transaction");
            log.error(e.getMessage());
            throw new RuntimeException("Rolled back because of "+e.getMessage());
        }

    }

    private void updateDB(String text) throws Exception {

        Connection conn = null;
        PreparedStatement ps = null;
        try {
            System.out.println("*******datasource "+datasource);
            conn = datasource.getConnection();
            System.out.println("*******conn "+conn.getMetaData().getUserName());
            if (conn!=null) {
                System.out.println("*******conn "+conn.getMetaData().getUserName());
                ps = conn.prepareStatement("INSERT INTO MY_TABLE (name) VALUES(?)");
                ps.setString(1, text);
                ps.executeUpdate();
            }
        } catch (Exception e) {
            throw e;
        } finally {
            if (ps!=null) {
                try {
                    ps.close();
                } catch (SQLException e) {
                    log.error(e.getMessage());
                    // do nothing
                }
            }
            if (conn!=null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                    log.error(e.getMessage());
                    // do nothing
                }
            }
        }

    }

    private void sendMsg(String msgToBeSent) throws Exception {

        javax.jms.Connection conn = null;
        Session session = null;
        try {
            System.out.println("*************forwardConnectionFactory"+forwardConnectionFactory);
            conn = forwardConnectionFactory.createConnection();
            session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
            MessageProducer messageProducer = session.createProducer(forwardQueue);
            TextMessage msg = session.createTextMessage(msgToBeSent);
            messageProducer.send(msg);

        } catch (Exception e) {
            throw e;
        } finally {
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    // do nothing
                }
            }
            if (conn != null) {
                try {
                    conn.close();
                } catch (JMSException e) {
                    // do nothing
                }
            }
        }
    }

}

MyListener.java:

package com.test;

import javax.transaction.Status;
import javax.transaction.SystemException;

import org.apache.log4j.Logger;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.transaction.jta.JtaTransactionManager;

public class MyListener extends DefaultMessageListenerContainer {

    static Logger log = Logger.getLogger(MyListener.class);

    public void printInfo() {

        try {

            log.info("trans manager="+((JtaTransactionManager)this.getTransactionManager()).getTransactionManager()+","+((JtaTransactionManager)this.getTransactionManager()).getTransactionManager().getStatus()+", this.isSessionTransacted()="+this.isSessionTransacted());
            log.info("STATUS_ACTIVE="+Status.STATUS_ACTIVE);
            log.info("STATUS_COMMITTEDE="+Status.STATUS_COMMITTED);
            log.info("STATUS_COMMITTING="+Status.STATUS_COMMITTING);
            log.info("STATUS_MARKED_ROLLBACK="+Status.STATUS_MARKED_ROLLBACK);
            log.info("STATUS_NO_TRANSACTION="+Status.STATUS_NO_TRANSACTION);
            log.info("STATUS_PREPARED="+Status.STATUS_PREPARED);
            log.info("STATUS_PREPARING="+Status.STATUS_PREPARING);
            log.info("STATUS_ROLLEDBACK="+Status.STATUS_ROLLEDBACK);
            log.info("STATUS_ROLLING_BACK="+Status.STATUS_ROLLING_BACK);
            log.info("STATUS_UNKNOWN="+Status.STATUS_UNKNOWN);



        } catch (SystemException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

    public void forceRollback() {
        try {
            ((JtaTransactionManager)this.getTransactionManager()).getTransactionManager().setRollbackOnly();
        } catch (IllegalStateException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (SecurityException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (SystemException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

After updating the database and sending the message to the outgoing queue I am purposefully throwing a RuntimeException just to test the transaction rollback of both the database and the message broker.

All three operations are committed in case of success, but it only rolls back the database operation in case of failure, while the two JMS operations are committed anyway.

It could either be:

I already spent quite much time fighting with the thing and searching for possible solutions.

It would be great to hear your opinion on this and, once again, advanced apologies if it turns out to be a mistake on my side.


Solution

  • I believe you need to use the ActiveMQ JCA resource adapter to ensure that connections are automatically enlisted into the XA transaction. Try this:

    <tomee>
        <Resource id="MyJmsResourceAdapter" type="ActiveMQResourceAdapter">
            # Do not start the embedded ActiveMQ broker
            BrokerXmlConfig  =
            ServerUrl = tcp://externalhost:61616?jms.redeliveryPolicy.maximumRedeliveries=0
        </Resource>
    
        <Resource id="jms/MyIncomingConnFactory" type="javax.jms.ConnectionFactory">
            resourceAdapter = MyJmsResourceAdapter
            transactionSupport = xa
        </Resource>
    
        <Resource id="jms/MyOutgoingConnFactory" type="javax.jms.ConnectionFactory">
            resourceAdapter = MyJmsResourceAdapter
            transactionSupport = xa
        </Resource>
    
        <Resource id="jms/MyOutgoingQueue" type="javax.jms.Queue"/>
        <Resource id="jms/MyIncomingQueue" type="javax.jms.Queue"/>
    
        <Resource id="jdbc/myDBXAPooled" type="DataSource">
            XaDataSource myDBXA
            DataSourceCreator dbcp
            JtaManaged true
            UserName TestUser
            Password TestPassword
            MaxWait 2000
            ValidationQuery SELECT 1    
            MaxActive 15
        </Resource> 
    
        <Resource id="myDBXA" type="XADataSource" class-name="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource">
            Url jdbc:mysql://localhost:3306/test
            User TestUser
            Password TestPassword
        </Resource>
    </tomee>