I am trying to achieve a distributed transaction in Apache TomEE. In words, the flow is:
Operations 1, 2, & 3 are all part of the same XA transaction controlled by TomEE. Therefore, under any circumstances they either all fail or all succeed.
tomee.xml
<?xml version="1.0" encoding="UTF-8"?>
<tomee>
this resource adapter is just necessary to tell tomee to not start internal ActiveMq instance
<Resource id="MyAdapter" type="ActiveMQResourceAdapter">
BrokerXmlConfig
ServerUrl tcp://fakehost:666
</Resource>
<Resource id="jms/MyIncomingConnFactory" type="javax.jms.ConnectionFactory" class-name="org.apache.activemq.ActiveMQXAConnectionFactory">
BrokerURL tcp://externalhost:61616?jms.redeliveryPolicy.maximumRedeliveries=0
</Resource>
<Resource id="jms/MyOutgoingConnFactory" type="javax.jms.ConnectionFactory" class-name="org.apache.activemq.ActiveMQXAConnectionFactory">
BrokerURL tcp://externalhost:61616?jms.redeliveryPolicy.maximumRedeliveries=0
</Resource>
<Resource id="jms/MyOutgoingQueue" class-name="org.apache.activemq.command.ActiveMQQueue">
PhysicalName MY_OUTGOING_QUEUE
</Resource>
<Resource id="jms/MyIncomingQueue" class-name="org.apache.activemq.command.ActiveMQQueue">
PhysicalName MY_INCOMING_QUEUE
</Resource>
<Resource id="jdbc/myDBXAPooled" type="DataSource">
XaDataSource myDBXA
DataSourceCreator dbcp
JtaManaged true
UserName TestUser
Password TestPassword
MaxWait 2000
ValidationQuery SELECT 1
MaxActive 15
</Resource>
<Resource id="myDBXA" type="XADataSource" class-name="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource">
Url jdbc:mysql://localhost:3306/test
User TestUser
Password TestPassword
</Resource>
</tomee>
Springconfig.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.2.xsd">
<!-- <jee:jndi-lookup jndi-name="myDBXAPooled" id="myDatasource" resource-ref="true" /> -->
<jee:jndi-lookup jndi-name="jms/MyOutgoingConnFactory" id="myOutgoingConnFactory" resource-ref="true" />
<jee:jndi-lookup jndi-name="jms/MyIncomingConnFactory" id="myIncomingConnFactory" resource-ref="true" />
<jee:jndi-lookup jndi-name="jms/MyOutgoingQueue" id="myOutgoingQueue" resource-ref="true" />
<jee:jndi-lookup jndi-name="jms/MyIncomingQueue" id="myIncomingQueue" resource-ref="true" />
<jee:jndi-lookup jndi-name="jdbc/myDBXAPooled" id="myDatasource" resource-ref="true" />
<tx:jta-transaction-manager/>
<!-- <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/> -->
<!-- the previous two ways of getting the transactionManager seems equivalent and both get Geronimo -->
</beans>
SpringConfig.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.2.xsd">
<bean id="messageListener" class="com.test.MyListener">
<property name="connectionFactory" ref="myIncomingConnFactory" />
<property name="destination" ref="myIncomingQueue" />
<!-- <property name="sessionTransacted" value="true" /> -->
<property name="concurrentConsumers" value="1" />
<property name="maxConcurrentConsumers" value="6" />
<property name="messageListener" ref="myMessageProcessor" />
<property name="transactionManager" ref="transactionManager" />
<property name="taskExecutor" ref="msgListenersTaskExecutor" />
</bean>
<bean id="myMessageProcessor" class="com.test.MyMessageReceiver">
<property name="forwardConnectionFactory" ref="myOutgoingConnFactory" />
<property name="forwardQueue" ref="myOutgoingQueue" />
<property name="datasource" ref="myDatasource" />
</bean>
<bean id="msgListenersTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"/>
</beans>
MyMessageReceiver.java:
package com.test;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.sql.DataSource;
import org.apache.log4j.Logger;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
public class MyMessageReceiver implements MessageListener {
static Logger log = Logger.getLogger(MyMessageReceiver.class);
private ConnectionFactory forwardConnectionFactory;
private Queue forwardQueue;
private DataSource datasource;
public void setForwardConnectionFactory(ConnectionFactory connFactory) {
forwardConnectionFactory=connFactory;
}
public void setforwardQueue(Queue queue) {
forwardQueue=queue;
}
public void setDatasource(DataSource ds) {
datasource=ds;
}
@Override
@Transactional(propagation=Propagation.REQUIRED)
public void onMessage(Message message) {
log.info("************************************");
MyListener listener = (MyListener)SpringContext.getBean("messageListener");
listener.printInfo();
log.info("************************************");
TextMessage msg = (TextMessage) message;
String text = null;
try {
text = msg.getText();
if (text != null) log.info("MESSAGE RECEIVED: "+ text);
updateDB(text); // function call to update DB
sendMsg(text); // function call to publish messages to queue
System.out.println("****************Rollback");
// Throwing exception to rollback DB, Message should not be
// published and consumed message sent to a DLQ
//(Broker side DLQ configuration already done)
throw new RuntimeException();
//if (text!=null && text.indexOf("rollback")!=-1) throw new RuntimeException("Message content includes the word rollback");
} catch (Exception e) {
log.error("Rolling back the entire XA transaction");
log.error(e.getMessage());
throw new RuntimeException("Rolled back because of "+e.getMessage());
}
}
private void updateDB(String text) throws Exception {
Connection conn = null;
PreparedStatement ps = null;
try {
System.out.println("*******datasource "+datasource);
conn = datasource.getConnection();
System.out.println("*******conn "+conn.getMetaData().getUserName());
if (conn!=null) {
System.out.println("*******conn "+conn.getMetaData().getUserName());
ps = conn.prepareStatement("INSERT INTO MY_TABLE (name) VALUES(?)");
ps.setString(1, text);
ps.executeUpdate();
}
} catch (Exception e) {
throw e;
} finally {
if (ps!=null) {
try {
ps.close();
} catch (SQLException e) {
log.error(e.getMessage());
// do nothing
}
}
if (conn!=null) {
try {
conn.close();
} catch (SQLException e) {
log.error(e.getMessage());
// do nothing
}
}
}
}
private void sendMsg(String msgToBeSent) throws Exception {
javax.jms.Connection conn = null;
Session session = null;
try {
System.out.println("*************forwardConnectionFactory"+forwardConnectionFactory);
conn = forwardConnectionFactory.createConnection();
session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageProducer messageProducer = session.createProducer(forwardQueue);
TextMessage msg = session.createTextMessage(msgToBeSent);
messageProducer.send(msg);
} catch (Exception e) {
throw e;
} finally {
if (session != null) {
try {
session.close();
} catch (JMSException e) {
// do nothing
}
}
if (conn != null) {
try {
conn.close();
} catch (JMSException e) {
// do nothing
}
}
}
}
}
MyListener.java:
package com.test;
import javax.transaction.Status;
import javax.transaction.SystemException;
import org.apache.log4j.Logger;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.transaction.jta.JtaTransactionManager;
public class MyListener extends DefaultMessageListenerContainer {
static Logger log = Logger.getLogger(MyListener.class);
public void printInfo() {
try {
log.info("trans manager="+((JtaTransactionManager)this.getTransactionManager()).getTransactionManager()+","+((JtaTransactionManager)this.getTransactionManager()).getTransactionManager().getStatus()+", this.isSessionTransacted()="+this.isSessionTransacted());
log.info("STATUS_ACTIVE="+Status.STATUS_ACTIVE);
log.info("STATUS_COMMITTEDE="+Status.STATUS_COMMITTED);
log.info("STATUS_COMMITTING="+Status.STATUS_COMMITTING);
log.info("STATUS_MARKED_ROLLBACK="+Status.STATUS_MARKED_ROLLBACK);
log.info("STATUS_NO_TRANSACTION="+Status.STATUS_NO_TRANSACTION);
log.info("STATUS_PREPARED="+Status.STATUS_PREPARED);
log.info("STATUS_PREPARING="+Status.STATUS_PREPARING);
log.info("STATUS_ROLLEDBACK="+Status.STATUS_ROLLEDBACK);
log.info("STATUS_ROLLING_BACK="+Status.STATUS_ROLLING_BACK);
log.info("STATUS_UNKNOWN="+Status.STATUS_UNKNOWN);
} catch (SystemException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void forceRollback() {
try {
((JtaTransactionManager)this.getTransactionManager()).getTransactionManager().setRollbackOnly();
} catch (IllegalStateException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (SecurityException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (SystemException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
After updating the database and sending the message to the outgoing queue I am purposefully throwing a RuntimeException
just to test the transaction rollback of both the database and the message broker.
All three operations are committed in case of success, but it only rolls back the database operation in case of failure, while the two JMS operations are committed anyway.
It could either be:
I already spent quite much time fighting with the thing and searching for possible solutions.
It would be great to hear your opinion on this and, once again, advanced apologies if it turns out to be a mistake on my side.
I believe you need to use the ActiveMQ JCA resource adapter to ensure that connections are automatically enlisted into the XA transaction. Try this:
<tomee>
<Resource id="MyJmsResourceAdapter" type="ActiveMQResourceAdapter">
# Do not start the embedded ActiveMQ broker
BrokerXmlConfig =
ServerUrl = tcp://externalhost:61616?jms.redeliveryPolicy.maximumRedeliveries=0
</Resource>
<Resource id="jms/MyIncomingConnFactory" type="javax.jms.ConnectionFactory">
resourceAdapter = MyJmsResourceAdapter
transactionSupport = xa
</Resource>
<Resource id="jms/MyOutgoingConnFactory" type="javax.jms.ConnectionFactory">
resourceAdapter = MyJmsResourceAdapter
transactionSupport = xa
</Resource>
<Resource id="jms/MyOutgoingQueue" type="javax.jms.Queue"/>
<Resource id="jms/MyIncomingQueue" type="javax.jms.Queue"/>
<Resource id="jdbc/myDBXAPooled" type="DataSource">
XaDataSource myDBXA
DataSourceCreator dbcp
JtaManaged true
UserName TestUser
Password TestPassword
MaxWait 2000
ValidationQuery SELECT 1
MaxActive 15
</Resource>
<Resource id="myDBXA" type="XADataSource" class-name="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource">
Url jdbc:mysql://localhost:3306/test
User TestUser
Password TestPassword
</Resource>
</tomee>