apache-camelapache-servicemixqpiddurable-subscription

Implementing Durable Subscription on ServiceMix


I currently am using ServiceMix to route messages using QPID libraries.

My working config is below :

<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="
      http://www.osgi.org/xmlns/blueprint/v1.0.0
      http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd">

  <bean id="amqp" class="org.apache.camel.component.amqp.AMQPComponent">
    <property name="connectionFactory">
       <bean class="org.apache.qpid.jms.JmsConnectionFactory">
         <property name="remoteURI" value="failover:amqps://remote-broker:9551?transport.trustStoreLocation=/vault/QA_UM_A.jks />

       </bean>
    </property>
  </bean>

  <bean id="kubeActivemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="brokerURL" value="tcp://10.100.10.13:61616" />
  </bean>

  <route>
   <from uri="amqp:queue:///queue-1" />
   <to uri="kubeActivemq:local-queue?jmsMessageType=Text" />
  </route>

  </camelContext>
</blueprint>

The above config works.

Now I need to implement durable subscription so that if servicemix goes down, it is able to recover all messages when it comes back up (the ones sent when servicemix is down.)

Based on the documentation , I have implemented a route :

<route>
      <from uri="amqp:queue:///queue-1?clientId=1&amp;durableSubscriptionName=bar1" />
      <to uri="kubeActivemq:queue:///local-queue" />
</route>

But this implementation gives the ERROR :

2018-05-28 13:28:58,421 | ERROR | mix-7.0.0/deploy | BlueprintCamelContext            | 40 - org.apache.camel.camel-blueprint - 2.16.4 | Error occurred during starting Camel: CamelContext(camel-1) due A durable subscription requires a topic (pub-sub domain)
java.lang.IllegalArgumentException: A durable subscription requires a topic (pub-sub domain)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.validateConfiguration(AbstractMessageListenerContainer.java:435)
    at org.springframework.jms.listener.AbstractJmsListeningContainer.afterPropertiesSet(AbstractJmsListeningContainer.java:157)
    at org.apache.camel.component.jms.JmsConsumer.prepareAndStartListenerContainer(JmsConsumer.java:163)
    at org.apache.camel.component.jms.JmsConsumer.doStart(JmsConsumer.java:155)
    at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61)
    at org.apache.camel.impl.DefaultCamelContext.startService(DefaultCamelContext.java:3234)
    at org.apache.camel.impl.DefaultCamelContext.doStartOrResumeRouteConsumers(DefaultCamelContext.java:3528)
    at org.apache.camel.impl.DefaultCamelContext.doStartRouteConsumers(DefaultCamelContext.java:3464)
    at org.apache.camel.impl.DefaultCamelContext.safelyStartRouteServices(DefaultCamelContext.java:3394)
    at org.apache.camel.impl.DefaultCamelContext.doStartOrResumeRoutes(DefaultCamelContext.java:3162)
    at org.apache.camel.impl.DefaultCamelContext.doStartCamel(DefaultCamelContext.java:3018)
.
.
.

Is there something Ive missed here ? Arent the topic names supposed to be same as queue names ?


Solution

  • Your route is instructing Camel to connect to a queue, and you need to change it to use a 'topic' for Durable Subscriptions. (Queue subscriptions are durable by default)

    amqp:queue:///queue-1...

    And Durable Subscriptions require a topic

    amqp:topic:///topic-1...