javaactivemq-artemis

ActiveMQ multicast topic does round-robin instead


I'm trying to implement a pub-sub with embedded ActiveMQ Artemis 2.40.0. I create the address and write messages to it, but I receive the messages on different clients in a round-robin fashion.

My broker.xml:

<configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="urn:activemq" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
    <core xmlns="urn:activemq:core">
        <persistence-enabled>false</persistence-enabled>
        <security-enabled>false</security-enabled>
        <acceptors>
            <acceptor name="in-vm">vm://0</acceptor>
        </acceptors>

        <addresses>
            <address name="address1">
                <multicast>
                    <queue name="q1" max-consumers="99">
                        <durable>false</durable>
                    </queue>
                </multicast>
            </address>
        </addresses>
    </core>
</configuration>

My code:

import com.shorcan.log.LogUtil;
import com.shorcan.util.SleepUtil;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.client.*;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Timer;
import java.util.TimerTask;

public class Example {
    private static final Logger log_ = LoggerFactory.getLogger(Example.class);

    public Example() throws Exception {
        EmbeddedActiveMQ broker = new EmbeddedActiveMQ();
        broker.setConfigResourcePath("file:cfg/broker.xml");
        broker.start();
    }

     public void createProducer() throws Exception {
        ServerLocator locator = ActiveMQClient.createServerLocator("vm://0");
        ClientSessionFactory factory = locator.createSessionFactory();
        ClientSession session = factory.createSession();
        session.start();

        ClientProducer producer = session.createProducer("address1");

        TimerTask task = new TimerTask() {
            public void run() {
                log_.info("Sending message...");
                ClientMessage message = session.createMessage(false);
                message.getBodyBuffer().writeString("Hi, There");
                try {
                    producer.send(message);
                } catch (ActiveMQException e) {
                    log_.warn("Error sending message: " + e.getMessage());
                }
            }
        };
        Timer timer = new Timer();
        timer.scheduleAtFixedRate(task, 5000, 5000);
    }

    public void createConsumer(int id) throws Exception {
        ServerLocator locator = ActiveMQClient.createServerLocator("vm://0");
        ClientSessionFactory factory = locator.createSessionFactory();
        ClientSession session = factory.createSession();
        session.start();
        ClientConsumer consumer = session.createConsumer("q1");
        consumer.setMessageHandler(new MessageHandler() {
            @Override
            public void onMessage(ClientMessage clientMessage) {
                log_.info(">>> consumer {} received {}", id, clientMessage);
            }
        });
    }

    public static void main(String[] args) throws Exception {
        LogUtil.initFrom("cfg/logging.properties");
        Example example = new Example();

        SleepUtil.waitFor(5);
        example.createProducer();

        example.createConsumer(0);
        example.createConsumer(1);
        example.createConsumer(2);
    }
}

When I run this, I get a message on a different consumer every 5s (0, 1, 2, 0, 1, 2, etc)

Can anyone point out where I've gone wrong?


Solution

  • Correct me if I'm wrong, but you are using a multicast address and defining a singular queue q1 under that. This unfortunately would put you into a competing consumer situation.

    Update your broker.xml thusly (note <multicast /> with no defined queue)

    <configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="urn:activemq" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
        <core xmlns="urn:activemq:core">
            <persistence-enabled>false</persistence-enabled>
            <security-enabled>false</security-enabled>
            <acceptors>
                <acceptor name="in-vm">vm://0</acceptor>
            </acceptors>
    
            <addresses>
                <address name="address1">
                    <multicast />
                </address>
            </addresses>
        </core>
    </configuration>
    

    In your create client segment you have to do a little bit of trickery like this:

    import java.util.UUID;
    
    public class Example {
        public void createConsumer(int id) throws Exception {
            ServerLocator locator = ActiveMQClient.createServerLocator("vm://0");
            ClientSessionFactory factory = locator.createSessionFactory();
            ClientSession session = factory.createSession();
            session.start();
    
            String consumerQueueName = "address1-"+UUID.randomUUID();
            QueueConfiguration topicQueueConfiguration =
                QueueConfiguration.of(consumerQueueName)
                                    .setAddress("address1")
                                    .setDurable(false)
                                    .setTransient(true)
                                    .setRoutingType(RoutingType.MULTICAST);
            session.createQueue(topicQueueConfiguration);
                                    
    
            ClientConsumer consumer = session.createConsumer(consumerQueueName);
            consumer.setMessageHandler(new MessageHandler() {
                @Override
                public void onMessage(ClientMessage clientMessage) {
                    log_.info(">>> consumer {} received {}", id, clientMessage);
                }
            });
        }
    }
    
    

    It has been a hot minute since I last did this, however you have to dynamically create an ANYCAST queue attached to the address that is transient and will represent a specific client listening. It should auto-delete upon closure of the client session.

    The addressing used by artemis is more or less this:

    You have an address which is a destination that you can send messages to. You have the option of routing out as ANYCAST (traditional point-to-point) or MULTICAST (traditional pub-sub).

    Each client that wants to get a DISTINCT message must have a DISTINCT message queue to receive on. That is why I had to define the QueueConfiguration with a UUID and that SHOULD solve your issue.

    If you are doing this in a production targeted application, you will need to do some more work on ensuring deletion of the queue associated with the consumer on termination, otherwise the queue will persist as a multicast member which may degrade performance over time.

    Hopefully this will address your situation.