javaspringspring-xd

Spring XD unable to find class definition for RabbitMQ ConnectionFactory


I am in the process of switching from xd-singlenode mode to distributed, using version 1.3.1-RELEASE.

I am running Spring XD in a docker container, with a Dockerfile similar to the one here. According to the XD guide, if I want to use RabbitMQ as my data transport, then I need to configure a servers.yml file. I have RabbitMQ running in a separate docker container, and I link these two containers together as part of a docker composition.

In short, I have configured everything (I believe correctly), but when I go to run multiple streams in Spring XD (which is when RabbitMQ would come into play, since it will act as a message transport between containers) Spring XD throws the following exception:

java.lang.NoClassDefFoundError: org/springframework/amqp/rabbit/connection/ConnectionFactory
    at java.lang.Class.getDeclaredConstructors0(Native Method) ~[na:1.8.0_131]
    at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671) ~[na:1.8.0_131]
    at java.lang.Class.getConstructors(Class.java:1651) ~[na:1.8.0_131]
    at org.springframework.boot.BeanDefinitionLoader.isComponent(BeanDefinitionLoader.java:276) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
    at org.springframework.boot.BeanDefinitionLoader.load(BeanDefinitionLoader.java:158) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
    at org.springframework.boot.BeanDefinitionLoader.load(BeanDefinitionLoader.java:135) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
    at org.springframework.boot.BeanDefinitionLoader.load(BeanDefinitionLoader.java:127) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
    at org.springframework.boot.SpringApplication.load(SpringApplication.java:615) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:314) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
    at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:139) [spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
    at org.springframework.xd.dirt.plugins.spark.streaming.MessageBusConfiguration.createApplicationContext(MessageBusConfiguration.java:86) [spring-xd-dirt-1.3.1.RELEASE.jar:1.3.1.RELEASE]
    at org.springframework.xd.dirt.plugins.spark.streaming.MessageBusSender.start(MessageBusSender.java:105) [spring-xd-dirt-1.3.1.RELEASE.jar:1.3.1.RELEASE]
    at org.springframework.xd.spark.streaming.java.ModuleExecutor$1$1.call(ModuleExecutor.java:58) [spring-xd-spark-streaming-1.3.1.RELEASE.jar:1.3.1.RELEASE]
    at org.springframework.xd.spark.streaming.java.ModuleExecutor$1$1.call(ModuleExecutor.java:53) [spring-xd-spark-streaming-1.3.1.RELEASE.jar:1.3.1.RELEASE]
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:206) [spark-core_2.10-1.3.1.jar:1.3.1]
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:206) [spark-core_2.10-1.3.1.jar:1.3.1]
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) [spark-core_2.10-1.3.1.jar:1.3.1]
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) [spark-core_2.10-1.3.1.jar:1.3.1]
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) [spark-core_2.10-1.3.1.jar:1.3.1]
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) [spark-core_2.10-1.3.1.jar:1.3.1]
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) [spark-core_2.10-1.3.1.jar:1.3.1]
    at org.apache.spark.scheduler.Task.run(Task.scala:64) [spark-core_2.10-1.3.1.jar:1.3.1]
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) [spark-core_2.10-1.3.1.jar:1.3.1]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_131]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
Caused by: java.lang.ClassNotFoundException: org.springframework.amqp.rabbit.connection.ConnectionFactory
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_131]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_131]
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) ~[na:1.8.0_131]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_131]
    ... 26 common frames omitted

So, what gives? Does Spring XD not come with the necessary RabbitMQ dependencies?

I've even tried manually adding the spring-messaging jar into /opt/spring-xd/xd/lib which contains the dependency without success. What's going on here?

Edit: I'll throw in my servers.yml file for extra context. Maybe someone can see where I may have done something wrong:

spring:
  profiles: container
xd:
  transport: rabbit
  messagebus:
    local:
      queueSize:                   2147483647
      polling:                     1000
      executor:
        corePoolSize:              0
        maxPoolSize:               200
        queueSize:                 2147483647
        keepAliveSeconds:          60
    rabbit:
      compressionLevel:            1
            # bus-level property, applies only when 'compress=true' for a stream module
            # See java.util.zip.Deflater; 1=BEST_SPEED, 9=BEST_COMPRESSION, ...
      longStringLimit:             8192
            # Headers longer than this will not be converted to String and will be a
            # DataInputStream - such headers will NOT be properly converted back on output.
      default:
        ackMode:                   AUTO
             # Valid: AUTO (container acks), NONE (broker acks), MANUAL (consumer acks).
             # Upper case only.
             # Note: MANUAL requires specialized code in the consuming module and is unlikely to be
             # used in an XD application. For more information, see
             # http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack
        autoBindDLQ:               false
        backOffInitialInterval:    1000
        backOffMaxInterval:        10000
        backOffMultiplier:         2.0
        batchBufferLimit:          10000
        batchingEnabled:           false
        batchSize:                 100
        batchTimeout:              5000
        compress:                  false
        concurrency:               1
        deliveryMode:              PERSISTENT
        durableSubscription:       false
        maxAttempts:               3
        maxConcurrency:            1
        prefix:                    xdbus.
             # prefix for queue/exchange names so policies (ha, dle etc.) can be applied
        prefetch:                  1
        replyHeaderPatterns:       STANDARD_REPLY_HEADERS,*
        republishToDLQ:            false
             # When false, normal rabbitmq dlq processing; when true, republish to the DLQ with stack trace
        requestHeaderPatterns:     STANDARD_REQUEST_HEADERS,*
        requeue:                   true
        transacted:                false
        txSize:                    1

#    redis:
#      headers:
            # comma-delimited list of additional header names to transport
#      default:
            # default bus properties, if not specified at the module level
#        backOffInitialInterval:    1000
#        backOffMaxInterval:        10000
#        backOffMultiplier:         2.0
#        concurrency:               1
#        maxAttempts:               3
    kafka:
      brokers:                                 kafka:9092
      zkAddress:                               zookeeper:2181
      mode:                                    embeddedHeaders
      offsetManagement:                        kafkaTopic
      headers:
             # comma-delimited list of additional header names to transport
      socketBufferSize:                        2097152
      offsetStoreTopic:                        SpringXdOffsets
      offsetStoreSegmentSize:                  25000000
      offsetStoreRetentionTime:                60000
      offsetStoreRequiredAcks:                 1
      offsetStoreMaxFetchSize:                 1048576
      offsetStoreBatchBytes:                   16384
      offsetStoreBatchTime:                    1000
      offsetUpdateTimeWindow:                  10000
      offsetUpdateCount:                       0
      offsetUpdateShutdownTimeout:             2000
      default:
        batchSize:                 16384
        batchTimeout:              0
        replicationFactor:         1
        concurrency:               1
        requiredAcks:              1
        compressionCodec:          none
        queueSize:                 8192 # must be a power of 2
        maxWait:                   100
        fetchSize:                 1048576
        minPartitionCount:         1
        durableSubscription:       false
        syncProducer:              false
        syncProducerTimeout:       5000
---
#Config for admin
spring:
  profiles: admin
xd:
  transport: rabbit
  messagebus:
    local:
      queueSize:                   2147483647
      polling:                     1000
      executor:
        corePoolSize:              0
        maxPoolSize:               200
        queueSize:                 2147483647
        keepAliveSeconds:          60
    rabbit:
      compressionLevel:            1
            # bus-level property, applies only when 'compress=true' for a stream module
            # See java.util.zip.Deflater; 1=BEST_SPEED, 9=BEST_COMPRESSION, ...
      longStringLimit:             8192
            # Headers longer than this will not be converted to String and will be a
            # DataInputStream - such headers will NOT be properly converted back on output.
      default:
        ackMode:                   AUTO
             # Valid: AUTO (container acks), NONE (broker acks), MANUAL (consumer acks).
             # Upper case only.
             # Note: MANUAL requires specialized code in the consuming module and is unlikely to be
             # used in an XD application. For more information, see
             # http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack
        autoBindDLQ:               false
        backOffInitialInterval:    1000
        backOffMaxInterval:        10000
        backOffMultiplier:         2.0
        batchBufferLimit:          10000
        batchingEnabled:           false
        batchSize:                 100
        batchTimeout:              5000
        compress:                  false
        concurrency:               1
        deliveryMode:              PERSISTENT
        durableSubscription:       false
        maxAttempts:               3
        maxConcurrency:            1
        prefix:                    xdbus.
             # prefix for queue/exchange names so policies (ha, dle etc.) can be applied
        prefetch:                  1
        replyHeaderPatterns:       STANDARD_REPLY_HEADERS,*
        republishToDLQ:            false
             # When false, normal rabbitmq dlq processing; when true, republish to the DLQ with stack trace
        requestHeaderPatterns:     STANDARD_REQUEST_HEADERS,*
        requeue:                   true
        transacted:                false
        txSize:                    1

#    redis:
#      headers:
            # comma-delimited list of additional header names to transport
#      default:
            # default bus properties, if not specified at the module level
#        backOffInitialInterval:    1000
#        backOffMaxInterval:        10000
#        backOffMultiplier:         2.0
#        concurrency:               1
#        maxAttempts:               3
    kafka:
      brokers:                                 kafka:9092
      zkAddress:                               zookeeper:2181
      mode:                                    embeddedHeaders
      offsetManagement:                        kafkaTopic
      headers:
             # comma-delimited list of additional header names to transport
      socketBufferSize:                        2097152
      offsetStoreTopic:                        SpringXdOffsets
      offsetStoreSegmentSize:                  25000000
      offsetStoreRetentionTime:                60000
      offsetStoreRequiredAcks:                 1
      offsetStoreMaxFetchSize:                 1048576
      offsetStoreBatchBytes:                   16384
      offsetStoreBatchTime:                    1000
      offsetUpdateTimeWindow:                  10000
      offsetUpdateCount:                       0
      offsetUpdateShutdownTimeout:             2000
      default:
        batchSize:                 16384
        batchTimeout:              0
        replicationFactor:         1
        concurrency:               1
        requiredAcks:              1
        compressionCodec:          none
        queueSize:                 8192 # must be a power of 2
        maxWait:                   100
        fetchSize:                 1048576
        minPartitionCount:         1
        durableSubscription:       false
        syncProducer:              false
        syncProducerTimeout:       5000

---
#  Rabbit MQ Properties
#
#  NOTE: sslProperties is mutually exclusive with keyStore, keyStorePassphrase, trustStore, trustStorePassphrase.
#  if you set inline properties, values in the properties file location given by 'sslProperties' will be ignored.
#
spring:
  rabbitmq:
   addresses: my-domain:5672
   adminAddresses: http://my-domain:15672
   nodes: rabbit@my-domain
   username: myusername
   password: mypassword
   virtual_host: /
   useSSL: false
   sslProperties:
   ssl:
     keyStore:
     keyStorePassphrase:
     trustStore:
     trustStorePassphrase:

EDIT 2: It looks like the xd-admin and xd-container scripts do not add the messagebus jars to the $CLASSPATH upon the start of AdminServerApplication or ContainerServerApplication. I'm not sure if this is the culprit yet.


Solution

  • The Spring XD startup scripts for xd-admin and xd-container do not add any of the messagebus jars to the classpath before starting their respective applications. They only add jars sitting directly in /lib and the hadoop jars in lib/hadoop27/.

    I was able to resolve this by modifying the scripts to include the necessary jars on the classpath:

    RABBIT_LIB=$APP_HOME/lib/messagebus/rabbit
    if [ -d "$RABBIT_LIB" ]; then
      for k in "$RABBIT_LIB"/*.jar; do
        CLASSPATH="$CLASSPATH":"$k"
      done
    fi