We are using JMS to process messages in a Java 1.8 SE environment. The messages originate from an Oracle (12) Advanced Queue.
We would like to read a message from a JMS queue, do some work based on it, and save the result in the database. We don’t want to lose any messages, and we don’t want to duplicate processing on any message. In other words, we’d like the processing of the JMS message and the associated database activity to be a single transaction.
We’ve read various articles about how to do this (including Transaction and redelivery in JMS, JMS Message Delivery Reliability and Acknowledgement Patterns, Reliable JMS with Transactions). The consensus seems to be to use JTA/XA, but we were hoping for something simpler.
We are using Oracle’s Advanced Queueing as our JMS provider, so we decided to see whether we could use the same database connection for both JMS and database activity, so that a single commit would work for both JMS and database activity. It seems to have worked.
In the code below, we create a QueueConnection using an existing SQL Connection when we initialize the JMS queue. After processing the message, committing the JMS session also commits the database changes. We haven’t seen this approach discussed elsewhere, so we’re wondering if
Please comment on whether our approach should be reliable or whether we should use JTA/XA.
public class OracleJmsQueue {
private DataSource dataSource;
protected Queue queue;
protected QueueConnection queueConnection;
protected QueueReceiver queueReceiver;
protected QueueSession queueSession;
private java.sql.Connection dbConnection = null;
protected void initQueueSession()
throws JMSException, SQLException {
// Connect to the database source of messages
DataSource dataSource = getDataSource();
dbConnection = dataSource.getConnection();
dbConnection.setAutoCommit(false);
queueConnection = AQjmsQueueConnectionFactory.createQueueConnection(
dbConnection);
queueSession =
queueConnection.createQueueSession(true, Session.SESSION_TRANSACTED);
queue = ((AQjmsSession)queueSession).getQueue(queueUser, queueName);
queueReceiver = queueSession.createReceiver(queue);
}
public void run() {
initQueueSession();
// code omitted
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(200);
final Message message = queueReceiver.receiveNoWait();
if (message != null) {
processMessage(message); // alters DB tables
commitSession();
}
}
// catches omitted
}
}
protected void commitSession() throws JMSException {
logger.info("Committing " + queueName + " queue session");
queueSession.commit();
}
} // class OracleJmsQueue
It looks that your assumptions about JMS and OAQ are correct, given that processMessage
uses the dbConnection
class attribute.
https://docs.oracle.com/javaee/7/api/javax/jms/QueueConnection.html
So, answering your question: Yes, you have a reliable solution (assuming what I mentioned before).