I'm trying to divert a multicast address to anycast address and bridge the anycast data to other servers. Fortunately, there is also such an example in divert.
After everything is ready, I set the JmsTemplate PubSubDomain to true and send a message to multicast. The Queue under multicast can receive the message normally, but divert does not seem to work because the bridge server will not receive the message.
If I change the way, that is, use artemisClient to send messages, everything works fine, multicast. The queue and bridge server will receive the message. The only bad thing is that the web-console cannot preview the message text message, and will get the prompt "Unsupported message body type which cannot be displayed by hawtio." Is there a better solution?
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core">
<name>subscribe-embedded-artemis</name>
<persistence-enabled>true</persistence-enabled>
<journal-type>MAPPED</journal-type>
<paging-directory>prinect-data-flow/activemq-artemis/data/paging</paging-directory>
<bindings-directory>prinect-data-flow/activemq-artemis/data/bindings</bindings-directory>
<journal-directory>prinect-data-flow/activemq-artemis/data/journal</journal-directory>
<large-messages-directory>prinect-data-flow/activemq-artemis/data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
<journal-file-size>10M</journal-file-size>
<journal-buffer-timeout>40000</journal-buffer-timeout>
<journal-max-io>4096</journal-max-io>
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period>
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
that won't support flow control. -->
<max-disk-usage>90</max-disk-usage>
<!-- should the broker detect dead locks and other issues -->
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<page-sync-timeout>1352000</page-sync-timeout>
<connectors>
<connector name="remote-connector">tcp://localhost:61617</connector>
</connectors>
<acceptors>
<acceptor name="netty">tcp://0.0.0.0:61616?protocols=AMQP,CORE</acceptor>
</acceptors>
<diverts>
<divert name="multicastToAnycastDivert">
<routing-name>toAnycast</routing-name>
<address>dataSubTopic</address>
<forwarding-address>forwardMessageToCloud</forwarding-address>
<exclusive>false</exclusive>
</divert>
</diverts>
<bridges>
<bridge name="my-bridge">
<queue-name>forwardMessageToCloud</queue-name>
<forwarding-address>remotePrinectDataForward</forwarding-address>
<reconnect-attempts>-1</reconnect-attempts>
<static-connectors>
<connector-ref>remote-connector</connector-ref>
</static-connectors>
</bridge>
</bridges>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="createAddress" roles="guest"/>
<permission type="deleteAddress" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="browse" roles="guest"/>
<permission type="send" roles="guest"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="guest"/>
</security-setting>
</security-settings>
<address-settings>
<address-setting match="#">
<!-- default is 1.0 -->
<redelivery-delay-multiplier>1.5</redelivery-delay-multiplier>
<!-- default is 0 (no delay) -->
<redelivery-delay>5000</redelivery-delay>
<!-- default is 0.0) -->
<redelivery-collision-avoidance-factor>0.15</redelivery-collision-avoidance-factor>
<!-- default is redelivery-delay * 10 -->
<max-redelivery-delay>50000</max-redelivery-delay>
<dead-letter-address>DLA</dead-letter-address>
<max-delivery-attempts>3</max-delivery-attempts>
<auto-create-dead-letter-resources>true</auto-create-dead-letter-resources>
<dead-letter-queue-prefix/>
<dead-letter-queue-suffix>.DLQ</dead-letter-queue-suffix>
<expiry-address>expiryAddress</expiry-address>
<auto-create-expiry-resources>true</auto-create-expiry-resources>
<expiry-queue-prefix/>
<expiry-queue-suffix>.EXP</expiry-queue-suffix>
</address-setting>
</address-settings>
<addresses>
<address name="DLA">
<anycast>
<queue name="DLA"/>
</anycast>
</address>
<address name="dataSubTopic">
<multicast>
<queue name="dataSubQueue1"/>
<queue name="dataSubQueue2"/>
</multicast>
</address>
<address name="forwardMessageToCloud">
<anycast>
<queue name="forwardMessageToCloud"/>
</anycast>
</address>
</addresses>
</core>
public void jmsSend(String message) {
jmsTemplate.setPubSubDomain(true);
jmsTemplate.convertAndSend("dataSubTopic", message);
}
public void clientSend(String message) {
try (ClientSession session = artemisServerLocatorConfig.serverLocator()
.createSessionFactory()
.createSession()) {
session.start();
ClientProducer producer = session.createProducer("dataSubTopic");
ClientMessage clientMessage = session.createMessage(true);
clientMessage.setType(Message.TEXT_TYPE);
clientMessage.getBodyBuffer()
.writeString(message);
producer.send(clientMessage);
} catch (Exception e) {
log.error("message = {}", e.getMessage());
}
}
I did a quick proof-of-concept with 2 instances of ActiveMQ Artemis 2.38.0 as well as 2 instances of 2.19.1. On the first broker instance I added the following to broker.xml
based on your config:
...
<connectors>
<connector name="remote-connector">tcp://localhost:61617</connector>
</connectors>
...
<diverts>
<divert name="multicastToAnycastDivert">
<routing-name>toAnycast</routing-name>
<address>dataSubTopic</address>
<forwarding-address>forwardMessageToCloud</forwarding-address>
<exclusive>false</exclusive>
</divert>
</diverts>
<bridges>
<bridge name="my-bridge">
<queue-name>forwardMessageToCloud</queue-name>
<forwarding-address>remotePrinectDataForward</forwarding-address>
<reconnect-attempts>-1</reconnect-attempts>
<static-connectors>
<connector-ref>remote-connector</connector-ref>
</static-connectors>
</bridge>
</bridges>
...
<addresses>
...
<address name="dataSubTopic">
<multicast>
<queue name="dataSubQueue1"/>
<queue name="dataSubQueue2"/>
</multicast>
</address>
<address name="forwardMessageToCloud">
<anycast>
<queue name="forwardMessageToCloud"/>
</anycast>
</address>
</addresses>
...
On the second broker instance I added the following in broker.xml
:
<addresses>
...
<address name="remotePrinectDataForward">
<anycast>
<queue name="remotePrinectDataForward"/>
</anycast>
</address>
</addresses>
I started both brokers and then used the producer
command to send a JMS message to the topic dataSubTopic
on the first broker, i.e.:
./artemis producer --destination topic://dataSubTopic --message-count 1
This message was successfully diverted to forwardMessageToCloud
and sent across the bridge to remotePrinectDataForward
on the second broker.
Based on this it's not clear to me what might be going wrong in your use-case with Spring. Everything appears like it should be working normally as indeed it does in my test.
Regarding the "Unsupported message body type" issue you're seeing on the console when sending the message with the Core client I recommend you send a nullable SimpleString
instead, e.g.:
ClientProducer producer = session.createProducer("dataSubTopic");
ClientMessage clientMessage = session.createMessage(true);
clientMessage.setType(Message.TEXT_TYPE);
clientMessage.getBodyBuffer().writeNullableSimpleString(SimpleString.of(message));
producer.send(clientMessage);