javaamqpactivemq-artemis

How to configure ActiveMQ Artemis to use AMQP 1.0 and other protocols with Java


I have an ActiveMQ Artemis server which works fine. I now would like to enable AMQP on the server for a node application using pub-sub. While my node pub subs are able to connect to it they neither publish or receive even though the connections are valid. Why is my pub not sending & why is my sub not receiving?

I am following the publisher and subscriber examples for AMQP Rhea on Github. They both can connect to localhost:5672.

Below is my ActiveMQ Artemis JMS server implementation. Note that I add 2 configs with addAcceptorConfiguration (one for Artemis and another for AMQP's default ports).

//this is my jmsserver
/*
 * This Java source file was generated by the Gradle 'init' task.
 */
package testSupport.artemis.server;

import java.util.List;
import java.util.stream.Collectors;

import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;

public class JMSServer {
    private static int qId = 0;

    private ActiveMQServer server;
    private String errMsg = "";

    /**
     * Factory method to create an instance of a JMS Server
     * @param topics list of topics to add to the server
     * @return
     */
    public static JMSServer createJMSServer(List<String> topics) {
        JMSServer s = new JMSServer();
        s.start();
        if (topics != null) {
            s.setTopics(topics);
        }
        return s;
    }

    /**
     * Factory method to create an instance of a JMS Server
     * @return
     */
    public static JMSServer createJMSServer() {
        return createJMSServer(null);
    }

    /**
     * Updates the server config with settings required to connect invm or from
     * another process on localhost
     * 
     * @param config
     */
    public static void updateConfig(Configuration config) {
        try {
            config.setPersistenceEnabled(false)
                  .setSecurityEnabled(false)
                  .addAcceptorConfiguration("tcp", "tcp://localhost:61616")
                  .addAcceptorConfiguration("amqp", "tcp://localhost:5672");

            // SpringBoot may have already created an invm connector at 0, so only add one if it doesnt exist
            if (config.getAcceptorConfigurations().stream().noneMatch(poo -> poo.getParams().containsKey("serverId") &&
                    Integer.valueOf(poo.getParams().get("serverId").toString()) == 0)) {
                config.addAcceptorConfiguration("invm", "vm://0");
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    /**
     * Default Constructor
     */
    public JMSServer() {
        try {
            Configuration config = new ConfigurationImpl();
            updateConfig(config);
            server = ActiveMQServers.newActiveMQServer(config);
        } catch (Exception ex) {
            errMsg = ex + ": " + ex.getMessage();
            ex.printStackTrace();
        }
    }

    /**
     * Start the JMS Server
     * @return
     */
    public boolean start() {
        boolean success = false;
        try {
            server.start();

            for (int i = 0; i < 50; i++) {
                Thread.sleep(100);
                if (server.isActive()) {
                    success = true;
                    break;
                }
            }
        } catch (Exception ex) {
            errMsg = ex + ": " + ex.getMessage();
            ex.printStackTrace();
        }

        return success;
    }

    /**
     * Stop the JMS Server
     */
    public void stop() {
        try {
            server.stop();
        } catch (Exception ex) {
            errMsg = ex + ": " + ex.getMessage();
            ex.printStackTrace();
        }
    }

    /**
     * Get the Error Message
     * @return
     */
    public String getErrMsg() {
        return errMsg;
    }

    /**
     * Set a list of topics to add to this server
     * @param topics
     * @return
     */
    public boolean setTopics(List<String> topics) {
        boolean success = true;

        if (!server.isActive()) {
            errMsg = "Topics cannot be set until the server has been started.";
            return false;
        }

        // add the topics
        for (String t : topics) {
            try {
                SimpleString addr = SimpleString.toSimpleString(t);
                QueueConfig qcfg = QueueConfig.builderWith(qId, addr, addr).autoCreated(false)
                        .autoDelete(false)
                        .durable(true)
                        .build();

                server.getQueueFactory().createQueueWith(qcfg);
                server.addAddressInfo(new AddressInfo(addr, RoutingType.MULTICAST));
                qId++;
            } catch (Exception ex) {
                errMsg = ex + ": " + ex.getMessage();
                success = false;
            }
        }
        return success;
    }

}

Update: So comments suggest I had to add artemis-amqp-protocol to the classpath. In gradle I made sure to include artemis-amqp-protocol and artemis-jms-server with the matching versions like so:

dependencies {
    implementation 'org.apache.activemq:artemis-jms-server:2.22.0'
    implementation 'org.apache.activemq:artemis-amqp-protocol:2.22.0'
    // other dependencies here...
}

Solution

  • You must ensure that the artemis-amqp-protocol jar is on your classpath otherwise the broker won't be able to support AMQP. If it is then you should see a log message that says:

    AMQ221043: Protocol module found: [artemis-amqp-protocol]. Adding protocol support for: AMQP.

    And then later you'll see something like:

    AMQ221020: Started EPOLL Acceptor at localhost:5672 for protocols [AMQP].

    If you don't see these (or something very close to these) then the broker won't support AMQP.