I config the ActiveMQ Classic with virtual destination.
When I test the config with python-stomp the virtual topic setting never works for me.
I am using Apache ActiveMQ Classic 5.18.2, and below is my activemq.xml
:
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/>
<compositeQueue name="MY.QUEUE">
<forwardTo>
<queue physicalName="FOO" />
<queue physicalName="BAR" />
</forwardTo>
</compositeQueue>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
</broker>
<import resource="jetty.xml"/>
</beans>
I am using Python stomp as publisher and subscriber.
import time
import stomp
conn = stomp.Connection()
conn.connect('admin', 'admin', wait=True)
conn.send(body='123123', destination='VirtualTopic.test')
time.sleep(2)
conn.disconnect()
import time
import stomp
class MyListener(stomp.ConnectionListener):
def on_error(self, message):
print('received an error "%s"' % message.body)
def on_message(self, frame):
print('received a message "%s"' % frame)
conn = stomp.Connection()
conn.set_listener('', MyListener())
while True:
conn.connect('admin', 'admin', wait=True)
conn.subscribe(destination='VirtualTopicConsumers.A.VirtualTopic.test', id="1", ack='auto')
time.sleep(3)
conn.disconnect()
I start the subscriber and then publish the message, but my subscriber never get the message.
How should I fix this?
In ActiveMQ Classic you need to prefix STOMP destinations with the type you want Topic or Queue otherwise the broker defaults to treating then as a Queue therefore your code likely fails on the send as the broker would think you are sending to a queue named "VirtualTopic.test" instead of the Topic you wanted.
Refer to the documentation for more information