activemq-classicbroker

ActiveMQ consume/forward messages from another ActiveMQ instance


I have two brokers A and B. If I want to forward message from A to B everything is simple. I just need network connector in A broker configured like this:

<networkConnectors>
    <networkConnector staticBridge="true" userName="user" password="pass" uri="static://(tcp://B:61616)">
        <staticallyIncludedDestinations>
            <queue physicalName="QUEUE.TO.FORWARD.MESSAGE" />
        </staticallyIncludedDestinations>
    </networkConnector>
</networkConnectors>

I tought if I want to consume messageges from broker B from some other queue (let's name it QUEUE.TO.CONSUME) i just need do the same thing but with duplex set to true and just listen on QUEUE.TO.CONSUME on broker A like this:

<networkConnectors>
    <networkConnector name="from-B-to-A" staticBridge="true" duplex="true" userName="user" password="pass" uri="static://(tcp://B:61616)">
        <staticallyIncludedDestinations>
            <queue physicalName="QUEUE.TO.CONSUME" />
        </staticallyIncludedDestinations>
    </networkConnector>
    <networkConnector staticBridge="true" userName="user" password="pass" uri="static://(tcp://B:61616)">
        <staticallyIncludedDestinations>
            <queue physicalName="QUEUE.TO.FORWARD.MESSAGE" />
        </staticallyIncludedDestinations>
    </networkConnector>
</networkConnectors>

But it does not work as I expected. It seem that only every second message is forwared and the remaining are just lost. Suprisingly that creates two consumers on broker B QUEUE.TO.CONSUME and I assume that one of them consumes message without forwarding to broker A. How to create bridge on broker A that allows me consume messages from broker B without loosing messages. Creating network connector in broker B is not an option for now. I've also tried create inbound queue bridge like this:

<jmsBridgeConnectors>
    <jmsQueueConnector outboundQueueConnectionFactory="#remoteBroker" localUsername="user" localPassword="password">
        <inboundQueueBridges>
            <inboundQueueBridge inboundQueueName="QUEUE.TO.CONSUME" localQueueName="QUEUE.TO.CONSUME" />
        </inboundQueueBridges>
    </jmsQueueConnector>
</jmsBridgeConnectors>
...
</broker>
<bean id="remoteBroker" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="failover://(nio:B:61616)" />
        <property name="userName" value="user" />
        <property name="password" value="password" />
</bean>

This configuration creates consumer on remote broker B but it doesn't consume any messages which just hanging as enqueued and nothing happens. Broker A still doesn't receive any messages to its local queue.


Solution

  • Ok, I figure it out. I've just used embedded Apache Camel to define routing to remote host and it looks like this (camel.xml in conf directory):

    <beans
            xmlns="http://www.springframework.org/schema/beans"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xmlns:camel="http://camel.apache.org/schema/spring"
            xsi:schemaLocation="
         http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    
        <camelContext id="context" xmlns="http://camel.apache.org/schema/spring">
            <route>
                <from uri="remoteBroker:queue:QUEUE.TO.CONSUME"/>
                <to uri="localBroker:queue:QUEUE.TO.CONSUME"/>
            </route>
        </camelContext>
    
        <bean id="remoteBroker" class="org.apache.activemq.camel.component.ActiveMQComponent">
            <property name="connectionFactory">
                <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                    <property name="brokerURL" value="tcp://B:61616"/>
                    <property name="userName" value="user"/>
                    <property name="password" value="password"/>
                </bean>
            </property>
        </bean>
    
        <bean id="localBroker" class="org.apache.activemq.camel.component.ActiveMQComponent">
            <property name="connectionFactory">
                <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                    <property name="brokerURL" value="vm://localhost"/>
                </bean>
            </property>
        </bean>
    
    </beans>
    

    where localhost i broker A. And in activemq.xml:

    <import resource="camel.xml"/>