I am trying to use an ActiveMQ Classic cluster and Virtual Topics to create a highly-available messaging system for inter-service communication inside a K8s cluster and have encountered a problem with starting connections in failover vs. non-failover mode.
I am using NMS v2.0.0 and NMS.ActiveMQ v 2.0.1 (as I have had other issues with the 2.1 releases of both).
I have code that will start a Task, that then will spawn 4-5 sub-Tasks for my producer, each sub-task is responsible for creating the IConnection
, ISession
and IProducer
instances (with appropriate retries in Polly) using the connection string activemq:tcp://localhost:61616
. This code works as expected.
Using the same code, with the only change to be to use a failover connection string activemq:failover:(activemq:tcp://localhost:61616,activemq:tcp://localhost:61617,activemq:tcp://localhost:61618)
when starting the tasks the call to StartAsync
on the IConnection
blocks.
I attempted to move the IConnection.StartAsync()
call out into its own Task, however then what happens is then the call to IConnection.CreateSessionAsync()
blocks.
A (snipped) version of my code is below:
foreach (var type in EventUtil.GetAllEventTypes())
{
var initialisationSignal = new ManualResetEventSlim();
tasks.Add(new Tuple<Task, ManualResetEventSlim>(Task.Run(async () =>
{
IConnection connection = null;
ISession session = null;
IMessageProducer producer = null;
try
{
IConnectionFactory factory = new NMSConnectionFactory(_options.ActiveMqUri);
connection = await factory.CreateConnectionAsync();
await connection.StartAsync();
session = await connection.CreateSessionAsync();
var topicName = $"topic://VirtualTopic.{ActiveMqOptions.TopicNamer(type)}";
var destination = (ITopic)SessionUtil.GetDestination(session, topicName);
_logger.LogInformation("Creating producer for Event topic {TopicName}", topicName);
producer = await session.CreateProducerAsync(destination);
await RunProducerTask(type, producer, session, serviceCancel.Token);
}
finally
{
await CloseQuietly(producer, _logger);
await CloseQuietly(session, _logger);
await CloseQuietly(connection, _logger);
}
I tried to follow the guide for setting up failover however using the failover:(activemq:tcp://localhost:61616,activemq:tcp://localhost:61617,activemq:tcp://localhost:61618)
syntax generates a Apache.NMS.NMSConnectionException: No IConnectionFactory implementation found for connection URI: failover:(
.
I can use the same code with a single-broker URI, and I have tested it connecting to each of the three brokers and they work correctly as expected with the calls to StartAsync()
and CreateSessionAsync()
not blocking.
Those APIs block when failover is in play because they cannot proceed until the client has connected and been provided with basic wire negotiation data from the broker. The URI you are using is obviously wrong if it is stalling out in those APIs. The NMS clients are not very actively maintained so you are on your own most likely to fix and support them.
The simplest fix suggestion I can offer you is to not use the NMSConnectionFactory and its not so great magic to find transport providers but go directly to the NMS.ActiveMQ library implementation of the IConnectionFactory which doesn't require the 'activemq.' prefixes on the URI as it knows what its providers are.