I have an ActiveMQ Artemis server which works fine. I now would like to enable AMQP on the server for a node application using pub-sub. While my node pub subs are able to connect to it they neither publish or receive even though the connections are valid. Why is my pub not sending & why is my sub not receiving?
I am following the publisher and subscriber examples for AMQP Rhea on Github. They both can connect to localhost:5672
.
Below is my ActiveMQ Artemis JMS server implementation. Note that I add 2 configs with addAcceptorConfiguration
(one for Artemis and another for AMQP's default ports).
//this is my jmsserver
/*
* This Java source file was generated by the Gradle 'init' task.
*/
package testSupport.artemis.server;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
public class JMSServer {
private static int qId = 0;
private ActiveMQServer server;
private String errMsg = "";
/**
* Factory method to create an instance of a JMS Server
* @param topics list of topics to add to the server
* @return
*/
public static JMSServer createJMSServer(List<String> topics) {
JMSServer s = new JMSServer();
s.start();
if (topics != null) {
s.setTopics(topics);
}
return s;
}
/**
* Factory method to create an instance of a JMS Server
* @return
*/
public static JMSServer createJMSServer() {
return createJMSServer(null);
}
/**
* Updates the server config with settings required to connect invm or from
* another process on localhost
*
* @param config
*/
public static void updateConfig(Configuration config) {
try {
config.setPersistenceEnabled(false)
.setSecurityEnabled(false)
.addAcceptorConfiguration("tcp", "tcp://localhost:61616")
.addAcceptorConfiguration("amqp", "tcp://localhost:5672");
// SpringBoot may have already created an invm connector at 0, so only add one if it doesnt exist
if (config.getAcceptorConfigurations().stream().noneMatch(poo -> poo.getParams().containsKey("serverId") &&
Integer.valueOf(poo.getParams().get("serverId").toString()) == 0)) {
config.addAcceptorConfiguration("invm", "vm://0");
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
/**
* Default Constructor
*/
public JMSServer() {
try {
Configuration config = new ConfigurationImpl();
updateConfig(config);
server = ActiveMQServers.newActiveMQServer(config);
} catch (Exception ex) {
errMsg = ex + ": " + ex.getMessage();
ex.printStackTrace();
}
}
/**
* Start the JMS Server
* @return
*/
public boolean start() {
boolean success = false;
try {
server.start();
for (int i = 0; i < 50; i++) {
Thread.sleep(100);
if (server.isActive()) {
success = true;
break;
}
}
} catch (Exception ex) {
errMsg = ex + ": " + ex.getMessage();
ex.printStackTrace();
}
return success;
}
/**
* Stop the JMS Server
*/
public void stop() {
try {
server.stop();
} catch (Exception ex) {
errMsg = ex + ": " + ex.getMessage();
ex.printStackTrace();
}
}
/**
* Get the Error Message
* @return
*/
public String getErrMsg() {
return errMsg;
}
/**
* Set a list of topics to add to this server
* @param topics
* @return
*/
public boolean setTopics(List<String> topics) {
boolean success = true;
if (!server.isActive()) {
errMsg = "Topics cannot be set until the server has been started.";
return false;
}
// add the topics
for (String t : topics) {
try {
SimpleString addr = SimpleString.toSimpleString(t);
QueueConfig qcfg = QueueConfig.builderWith(qId, addr, addr).autoCreated(false)
.autoDelete(false)
.durable(true)
.build();
server.getQueueFactory().createQueueWith(qcfg);
server.addAddressInfo(new AddressInfo(addr, RoutingType.MULTICAST));
qId++;
} catch (Exception ex) {
errMsg = ex + ": " + ex.getMessage();
success = false;
}
}
return success;
}
}
Update:
So comments suggest I had to add artemis-amqp-protocol
to the classpath. In gradle I made sure to include artemis-amqp-protocol
and artemis-jms-server
with the matching versions like so:
dependencies {
implementation 'org.apache.activemq:artemis-jms-server:2.22.0'
implementation 'org.apache.activemq:artemis-amqp-protocol:2.22.0'
// other dependencies here...
}
You must ensure that the artemis-amqp-protocol
jar is on your classpath otherwise the broker won't be able to support AMQP. If it is then you should see a log message that says:
AMQ221043: Protocol module found: [artemis-amqp-protocol]. Adding protocol support for: AMQP.
And then later you'll see something like:
AMQ221020: Started EPOLL Acceptor at localhost:5672 for protocols [AMQP].
If you don't see these (or something very close to these) then the broker won't support AMQP.