javajmsactivemq-artemis

Handling client failover for independent servers without built-in `failover` protocol from ActiveMQ Classic


TL;DR:

I'm migrating from ActiveMQ Classic to ActiveMQ Artemis and need to manually handle client failover between two independent servers. The failover: protocol from ActiveMQ Classic is not available in Artemis. How can I implement application-level failover using an ExceptionListener to achieve a silent client reconnect as it used to happen in ActiveMQ Classic? We have the constraint that both broker run embedded into the application and independently. It is not possible to define a cluster or a primary and backup broker as discussed here.

Full Context:

I am in the process of migrating from ActiveMQ Classic to ActiveMQ Artemis. Our setup consists of two standalone Spring Boot applications, each with an embedded Artemis broker running independently on server1 and server2.

ActiveMQ Classic Client Configuration:

In ActiveMQ Classic, we used the failover: protocol to automatically handle client reconnections when one or both servers were down. The broker URL was configured as follows:

import org.apache.activemq.ActiveMQConnectionFactory;

@Bean
public ConnectionFactory connectionFactory(String user, String credential) {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://server1:port1,tcp://server2:port2)?jms.useAsyncSend=true&initialReconnectDelay=100&timeout=2000&randomize=false");
    factory.setTrustAllPackages(true);
    factory.setUserName(user);
    factory.setPassword(credential);
    return factory;
}

ActiveMQ Artemis Client Configuration:

In Artemis, we updated the broker URL but discovered the failover: protocol is no longer available. Our current URL looks like this:

import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;

public ConnectionFactory connectionFactory(String user, String credential) {
    ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory("(tcp://server1:port1,tcp://server2:port2)?jms.useAsyncSend=true&initialReconnectDelay=100&timeout=2000&sslEnabled=false&failoverAttempts=-1&useTopologyForLoadBalancing=false&connectionLoadBalancingPolicyClassName=org.apache.activemq.artemis.api.core.client.loadbalance.FirstElementConnectionLoadBalancingPolicy", user, credential);
    return factory;
}

Issue:

We aim to implement application-level failover manually, as described here. Our goal is to catch disconnection events and attempt reconnections programmatically.

Problem 1: Initial Connection Failover When server1 is down, the client does not automatically attempt to connect to server2, and the following exception is thrown:

org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing; nested exception is ActiveMQNotConnectedException[errorType=NOT_CONNECTED message=AMQ219007: Cannot connect to server(s). Tried with all available servers.]

Problem 2: Handling Disconnections and Reconnecting When the client is connected to server1, and server1 shuts down, the connection is lost, and the client does not attempt to reconnect to server2. The exception listener is only called after server1 is back up.

2024-07-29 10:12:45,318 [Thread-0 (ActiveMQ-client-global-threads)] INFO   o.a.a.a.c.client [] - AMQ214036: Connection closure to server1/server1:port1 has been detected: AMQ219015: The connection was disconnected because of server shutdown [code=DISCONNECTED]
2024-07-29 10:13:09,546 [Thread-6] INFO   d.d.m.j.ExampleListener [] - received exception in topic listener: {}
jakarta.jms.JMSException: ActiveMQDisconnectedException[errorType=DISCONNECTED message=AMQ219015: The connection was disconnected because of server shutdown]
2024-07-29 11:32:29,339 [Thread-6] INFO   d.d.m.j.ExampleListener [] - received exception in topic listener: {}
jakarta.jms.JMSException: ActiveMQDisconnectedException[errorType=DISCONNECTED message=AMQ219015: The connection was disconnected because of server shutdown]
        at org.apache.activemq.artemis.jms.client.ActiveMQConnection$JMSFailureListener.connectionFailed(ActiveMQConnection.java:724) ~[artemis-jakarta-client-2.33.0.jar:2.33.0]
        at org.apache.activemq.artemis.jms.client.ActiveMQConnection$JMSFailureListener.connectionFailed(ActiveMQConnection.java:745) ~[artemis-jakarta-client-2.33.0.jar:2.33.0]
        at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.callSessionFailureListeners(ClientSessionFactoryImpl.java:878) ~[artemis-core-client-2.33.0.jar:2.33.0]
        at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.callSessionFailureListeners(ClientSessionFactoryImpl.java:866) ~[artemis-core-client-2.33.0.jar:2.33.0]
        at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.failoverOrReconnect(ClientSessionFactoryImpl.java:812) ~[artemis-core-client-2.33.0.jar:2.33.0]
        at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.handleConnectionFailure(ClientSessionFactoryImpl.java:576) ~[artemis-core-client-2.33.0.jar:2.33.0]
        at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingFailureListener.connectionFailed(ClientSessionFactoryImpl.java:1417) ~[artemis-core-client-2.33.0.jar:2.33.0]
        at org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection.callFailureListeners(AbstractRemotingConnection.java:98) ~[artemis-core-client-2.33.0.jar:2.33.0]
        at org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl.fail(RemotingConnectionImpl.java:209) ~[artemis-core-client-2.33.0.jar:2.33.0]
        at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$CloseRunnable.run(ClientSessionFactoryImpl.java:1182) ~[artemis-core-client-2.33.0.jar:2.33.0]
        at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:57) ~[artemis-commons-2.33.0.jar:2.33.0]
        at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:32) ~[artemis-commons-2.33.0.jar:2.33.0]
        at org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:68) ~[artemis-commons-2.33.0.jar:2.33.0]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
        at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118) ~[artemis-commons-2.33.0.jar:2.33.0]
Caused by: org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException: AMQ219015: The connection was disconnected because of server shutdown
        ... 7 more


2024-07-29 11:32:29,351 [Thread-6] WARN   d.d.m.j.ExampleListener [] - Exception is not an instance of ActiveMQDisconnectedException

Here’s the implementation of our ExceptionListener:

public class ExampleListener implements ExceptionListener {

    private static final Logger LOG = LoggerFactory.getLogger(ExampleListener.class);

    @Override
    public void onException(final JMSException exception) {
        LOG.info("Received exception in topic listener: {}", exception);

        if (exception.getLinkedException() instanceof ActiveMQDisconnectedException) {
            // This if-Statement is always false.
            ActiveMQDisconnectedException e = (ActiveMQDisconnectedException) exception.getLinkedException();
            LOG.info("Disconnected exception type: {}", e.getType());
            LOG.info("Disconnected message type: {}", e.getMessage());
            // Handle reconnection logic here
        } else {
            LOG.warn("Exception is not an instance of ActiveMQDisconnectedException");
        }
    }
}

Questions:

  1. How can we handle reconnecting if the exception listener is only called after the server is restarted?
  2. How can we programmatically detect which server is down and attempt reconnection to the other server, similar to the failover: behavior in ActiveMQ Classic?

Solution

  • ActiveMQ Artemis CORE JMS client can be configured to automatically reconnect to the same server, reconnect to the backup server or reconnect to other active servers in the event that a failure is detected in the connection between the client and the server.

    Set failoverAttempts to any non-zero value and the round robin policy to reconnect to other active servers, i.e.

    import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
    
    public ConnectionFactory connectionFactory(String user, String credential) {
        ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory("(tcp://server1:port1,tcp://server2:port2)?jms.useAsyncSend=true&initialReconnectDelay=100&timeout=2000&sslEnabled=false&failoverAttempts=-1&useTopologyForLoadBalancing=false&connectionLoadBalancingPolicyClassName=org.apache.activemq.artemis.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy", user, credential);
        return factory;
    }