I'm migrating from ActiveMQ Classic to ActiveMQ Artemis and need to manually handle client failover between two independent servers. The failover:
protocol from ActiveMQ Classic is not available in Artemis. How can I implement application-level failover using an ExceptionListener
to achieve a silent client reconnect as it used to happen in ActiveMQ Classic? We have the constraint that both broker run embedded into the application and independently. It is not possible to define a cluster or a primary and backup broker as discussed here.
I am in the process of migrating from ActiveMQ Classic to ActiveMQ Artemis. Our setup consists of two standalone Spring Boot applications, each with an embedded Artemis broker running independently on server1
and server2
.
In ActiveMQ Classic, we used the failover:
protocol to automatically handle client reconnections when one or both servers were down. The broker URL was configured as follows:
import org.apache.activemq.ActiveMQConnectionFactory;
@Bean
public ConnectionFactory connectionFactory(String user, String credential) {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://server1:port1,tcp://server2:port2)?jms.useAsyncSend=true&initialReconnectDelay=100&timeout=2000&randomize=false");
factory.setTrustAllPackages(true);
factory.setUserName(user);
factory.setPassword(credential);
return factory;
}
In Artemis, we updated the broker URL but discovered the failover:
protocol is no longer available. Our current URL looks like this:
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
public ConnectionFactory connectionFactory(String user, String credential) {
ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory("(tcp://server1:port1,tcp://server2:port2)?jms.useAsyncSend=true&initialReconnectDelay=100&timeout=2000&sslEnabled=false&failoverAttempts=-1&useTopologyForLoadBalancing=false&connectionLoadBalancingPolicyClassName=org.apache.activemq.artemis.api.core.client.loadbalance.FirstElementConnectionLoadBalancingPolicy", user, credential);
return factory;
}
We aim to implement application-level failover manually, as described here. Our goal is to catch disconnection events and attempt reconnections programmatically.
Problem 1: Initial Connection Failover
When server1
is down, the client does not automatically attempt to connect to server2
, and the following exception is thrown:
org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing; nested exception is ActiveMQNotConnectedException[errorType=NOT_CONNECTED message=AMQ219007: Cannot connect to server(s). Tried with all available servers.]
Problem 2: Handling Disconnections and Reconnecting
When the client is connected to server1
, and server1
shuts down, the connection is lost, and the client does not attempt to reconnect to server2
. The exception listener is only called after server1
is back up.
2024-07-29 10:12:45,318 [Thread-0 (ActiveMQ-client-global-threads)] INFO o.a.a.a.c.client [] - AMQ214036: Connection closure to server1/server1:port1 has been detected: AMQ219015: The connection was disconnected because of server shutdown [code=DISCONNECTED]
2024-07-29 10:13:09,546 [Thread-6] INFO d.d.m.j.ExampleListener [] - received exception in topic listener: {}
jakarta.jms.JMSException: ActiveMQDisconnectedException[errorType=DISCONNECTED message=AMQ219015: The connection was disconnected because of server shutdown]
2024-07-29 11:32:29,339 [Thread-6] INFO d.d.m.j.ExampleListener [] - received exception in topic listener: {}
jakarta.jms.JMSException: ActiveMQDisconnectedException[errorType=DISCONNECTED message=AMQ219015: The connection was disconnected because of server shutdown]
at org.apache.activemq.artemis.jms.client.ActiveMQConnection$JMSFailureListener.connectionFailed(ActiveMQConnection.java:724) ~[artemis-jakarta-client-2.33.0.jar:2.33.0]
at org.apache.activemq.artemis.jms.client.ActiveMQConnection$JMSFailureListener.connectionFailed(ActiveMQConnection.java:745) ~[artemis-jakarta-client-2.33.0.jar:2.33.0]
at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.callSessionFailureListeners(ClientSessionFactoryImpl.java:878) ~[artemis-core-client-2.33.0.jar:2.33.0]
at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.callSessionFailureListeners(ClientSessionFactoryImpl.java:866) ~[artemis-core-client-2.33.0.jar:2.33.0]
at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.failoverOrReconnect(ClientSessionFactoryImpl.java:812) ~[artemis-core-client-2.33.0.jar:2.33.0]
at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.handleConnectionFailure(ClientSessionFactoryImpl.java:576) ~[artemis-core-client-2.33.0.jar:2.33.0]
at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingFailureListener.connectionFailed(ClientSessionFactoryImpl.java:1417) ~[artemis-core-client-2.33.0.jar:2.33.0]
at org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection.callFailureListeners(AbstractRemotingConnection.java:98) ~[artemis-core-client-2.33.0.jar:2.33.0]
at org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl.fail(RemotingConnectionImpl.java:209) ~[artemis-core-client-2.33.0.jar:2.33.0]
at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$CloseRunnable.run(ClientSessionFactoryImpl.java:1182) ~[artemis-core-client-2.33.0.jar:2.33.0]
at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:57) ~[artemis-commons-2.33.0.jar:2.33.0]
at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:32) ~[artemis-commons-2.33.0.jar:2.33.0]
at org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:68) ~[artemis-commons-2.33.0.jar:2.33.0]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118) ~[artemis-commons-2.33.0.jar:2.33.0]
Caused by: org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException: AMQ219015: The connection was disconnected because of server shutdown
... 7 more
2024-07-29 11:32:29,351 [Thread-6] WARN d.d.m.j.ExampleListener [] - Exception is not an instance of ActiveMQDisconnectedException
Here’s the implementation of our ExceptionListener
:
public class ExampleListener implements ExceptionListener {
private static final Logger LOG = LoggerFactory.getLogger(ExampleListener.class);
@Override
public void onException(final JMSException exception) {
LOG.info("Received exception in topic listener: {}", exception);
if (exception.getLinkedException() instanceof ActiveMQDisconnectedException) {
// This if-Statement is always false.
ActiveMQDisconnectedException e = (ActiveMQDisconnectedException) exception.getLinkedException();
LOG.info("Disconnected exception type: {}", e.getType());
LOG.info("Disconnected message type: {}", e.getMessage());
// Handle reconnection logic here
} else {
LOG.warn("Exception is not an instance of ActiveMQDisconnectedException");
}
}
}
failover:
behavior in ActiveMQ Classic?ActiveMQ Artemis CORE JMS client can be configured to automatically reconnect to the same server, reconnect to the backup server or reconnect to other active servers in the event that a failure is detected in the connection between the client and the server.
Set failoverAttempts to any non-zero value and the round robin policy to reconnect to other active servers, i.e.
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
public ConnectionFactory connectionFactory(String user, String credential) {
ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory("(tcp://server1:port1,tcp://server2:port2)?jms.useAsyncSend=true&initialReconnectDelay=100&timeout=2000&sslEnabled=false&failoverAttempts=-1&useTopologyForLoadBalancing=false&connectionLoadBalancingPolicyClassName=org.apache.activemq.artemis.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy", user, credential);
return factory;
}