I am in the process of switching from xd-singlenode
mode to distributed, using version 1.3.1-RELEASE
.
I am running Spring XD in a docker container, with a Dockerfile similar to the one here. According to the XD guide, if I want to use RabbitMQ as my data transport, then I need to configure a servers.yml
file. I have RabbitMQ running in a separate docker container, and I link these two containers together as part of a docker composition.
In short, I have configured everything (I believe correctly), but when I go to run multiple streams in Spring XD (which is when RabbitMQ would come into play, since it will act as a message transport between containers) Spring XD throws the following exception:
java.lang.NoClassDefFoundError: org/springframework/amqp/rabbit/connection/ConnectionFactory
at java.lang.Class.getDeclaredConstructors0(Native Method) ~[na:1.8.0_131]
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671) ~[na:1.8.0_131]
at java.lang.Class.getConstructors(Class.java:1651) ~[na:1.8.0_131]
at org.springframework.boot.BeanDefinitionLoader.isComponent(BeanDefinitionLoader.java:276) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
at org.springframework.boot.BeanDefinitionLoader.load(BeanDefinitionLoader.java:158) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
at org.springframework.boot.BeanDefinitionLoader.load(BeanDefinitionLoader.java:135) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
at org.springframework.boot.BeanDefinitionLoader.load(BeanDefinitionLoader.java:127) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
at org.springframework.boot.SpringApplication.load(SpringApplication.java:615) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:314) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:139) [spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
at org.springframework.xd.dirt.plugins.spark.streaming.MessageBusConfiguration.createApplicationContext(MessageBusConfiguration.java:86) [spring-xd-dirt-1.3.1.RELEASE.jar:1.3.1.RELEASE]
at org.springframework.xd.dirt.plugins.spark.streaming.MessageBusSender.start(MessageBusSender.java:105) [spring-xd-dirt-1.3.1.RELEASE.jar:1.3.1.RELEASE]
at org.springframework.xd.spark.streaming.java.ModuleExecutor$1$1.call(ModuleExecutor.java:58) [spring-xd-spark-streaming-1.3.1.RELEASE.jar:1.3.1.RELEASE]
at org.springframework.xd.spark.streaming.java.ModuleExecutor$1$1.call(ModuleExecutor.java:53) [spring-xd-spark-streaming-1.3.1.RELEASE.jar:1.3.1.RELEASE]
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:206) [spark-core_2.10-1.3.1.jar:1.3.1]
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:206) [spark-core_2.10-1.3.1.jar:1.3.1]
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) [spark-core_2.10-1.3.1.jar:1.3.1]
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) [spark-core_2.10-1.3.1.jar:1.3.1]
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) [spark-core_2.10-1.3.1.jar:1.3.1]
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) [spark-core_2.10-1.3.1.jar:1.3.1]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) [spark-core_2.10-1.3.1.jar:1.3.1]
at org.apache.spark.scheduler.Task.run(Task.scala:64) [spark-core_2.10-1.3.1.jar:1.3.1]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) [spark-core_2.10-1.3.1.jar:1.3.1]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
Caused by: java.lang.ClassNotFoundException: org.springframework.amqp.rabbit.connection.ConnectionFactory
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_131]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_131]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) ~[na:1.8.0_131]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_131]
... 26 common frames omitted
So, what gives? Does Spring XD not come with the necessary RabbitMQ dependencies?
I've even tried manually adding the spring-messaging
jar into /opt/spring-xd/xd/lib
which contains the dependency without success. What's going on here?
Edit: I'll throw in my servers.yml
file for extra context. Maybe someone can see where I may have done something wrong:
spring:
profiles: container
xd:
transport: rabbit
messagebus:
local:
queueSize: 2147483647
polling: 1000
executor:
corePoolSize: 0
maxPoolSize: 200
queueSize: 2147483647
keepAliveSeconds: 60
rabbit:
compressionLevel: 1
# bus-level property, applies only when 'compress=true' for a stream module
# See java.util.zip.Deflater; 1=BEST_SPEED, 9=BEST_COMPRESSION, ...
longStringLimit: 8192
# Headers longer than this will not be converted to String and will be a
# DataInputStream - such headers will NOT be properly converted back on output.
default:
ackMode: AUTO
# Valid: AUTO (container acks), NONE (broker acks), MANUAL (consumer acks).
# Upper case only.
# Note: MANUAL requires specialized code in the consuming module and is unlikely to be
# used in an XD application. For more information, see
# http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack
autoBindDLQ: false
backOffInitialInterval: 1000
backOffMaxInterval: 10000
backOffMultiplier: 2.0
batchBufferLimit: 10000
batchingEnabled: false
batchSize: 100
batchTimeout: 5000
compress: false
concurrency: 1
deliveryMode: PERSISTENT
durableSubscription: false
maxAttempts: 3
maxConcurrency: 1
prefix: xdbus.
# prefix for queue/exchange names so policies (ha, dle etc.) can be applied
prefetch: 1
replyHeaderPatterns: STANDARD_REPLY_HEADERS,*
republishToDLQ: false
# When false, normal rabbitmq dlq processing; when true, republish to the DLQ with stack trace
requestHeaderPatterns: STANDARD_REQUEST_HEADERS,*
requeue: true
transacted: false
txSize: 1
# redis:
# headers:
# comma-delimited list of additional header names to transport
# default:
# default bus properties, if not specified at the module level
# backOffInitialInterval: 1000
# backOffMaxInterval: 10000
# backOffMultiplier: 2.0
# concurrency: 1
# maxAttempts: 3
kafka:
brokers: kafka:9092
zkAddress: zookeeper:2181
mode: embeddedHeaders
offsetManagement: kafkaTopic
headers:
# comma-delimited list of additional header names to transport
socketBufferSize: 2097152
offsetStoreTopic: SpringXdOffsets
offsetStoreSegmentSize: 25000000
offsetStoreRetentionTime: 60000
offsetStoreRequiredAcks: 1
offsetStoreMaxFetchSize: 1048576
offsetStoreBatchBytes: 16384
offsetStoreBatchTime: 1000
offsetUpdateTimeWindow: 10000
offsetUpdateCount: 0
offsetUpdateShutdownTimeout: 2000
default:
batchSize: 16384
batchTimeout: 0
replicationFactor: 1
concurrency: 1
requiredAcks: 1
compressionCodec: none
queueSize: 8192 # must be a power of 2
maxWait: 100
fetchSize: 1048576
minPartitionCount: 1
durableSubscription: false
syncProducer: false
syncProducerTimeout: 5000
---
#Config for admin
spring:
profiles: admin
xd:
transport: rabbit
messagebus:
local:
queueSize: 2147483647
polling: 1000
executor:
corePoolSize: 0
maxPoolSize: 200
queueSize: 2147483647
keepAliveSeconds: 60
rabbit:
compressionLevel: 1
# bus-level property, applies only when 'compress=true' for a stream module
# See java.util.zip.Deflater; 1=BEST_SPEED, 9=BEST_COMPRESSION, ...
longStringLimit: 8192
# Headers longer than this will not be converted to String and will be a
# DataInputStream - such headers will NOT be properly converted back on output.
default:
ackMode: AUTO
# Valid: AUTO (container acks), NONE (broker acks), MANUAL (consumer acks).
# Upper case only.
# Note: MANUAL requires specialized code in the consuming module and is unlikely to be
# used in an XD application. For more information, see
# http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack
autoBindDLQ: false
backOffInitialInterval: 1000
backOffMaxInterval: 10000
backOffMultiplier: 2.0
batchBufferLimit: 10000
batchingEnabled: false
batchSize: 100
batchTimeout: 5000
compress: false
concurrency: 1
deliveryMode: PERSISTENT
durableSubscription: false
maxAttempts: 3
maxConcurrency: 1
prefix: xdbus.
# prefix for queue/exchange names so policies (ha, dle etc.) can be applied
prefetch: 1
replyHeaderPatterns: STANDARD_REPLY_HEADERS,*
republishToDLQ: false
# When false, normal rabbitmq dlq processing; when true, republish to the DLQ with stack trace
requestHeaderPatterns: STANDARD_REQUEST_HEADERS,*
requeue: true
transacted: false
txSize: 1
# redis:
# headers:
# comma-delimited list of additional header names to transport
# default:
# default bus properties, if not specified at the module level
# backOffInitialInterval: 1000
# backOffMaxInterval: 10000
# backOffMultiplier: 2.0
# concurrency: 1
# maxAttempts: 3
kafka:
brokers: kafka:9092
zkAddress: zookeeper:2181
mode: embeddedHeaders
offsetManagement: kafkaTopic
headers:
# comma-delimited list of additional header names to transport
socketBufferSize: 2097152
offsetStoreTopic: SpringXdOffsets
offsetStoreSegmentSize: 25000000
offsetStoreRetentionTime: 60000
offsetStoreRequiredAcks: 1
offsetStoreMaxFetchSize: 1048576
offsetStoreBatchBytes: 16384
offsetStoreBatchTime: 1000
offsetUpdateTimeWindow: 10000
offsetUpdateCount: 0
offsetUpdateShutdownTimeout: 2000
default:
batchSize: 16384
batchTimeout: 0
replicationFactor: 1
concurrency: 1
requiredAcks: 1
compressionCodec: none
queueSize: 8192 # must be a power of 2
maxWait: 100
fetchSize: 1048576
minPartitionCount: 1
durableSubscription: false
syncProducer: false
syncProducerTimeout: 5000
---
# Rabbit MQ Properties
#
# NOTE: sslProperties is mutually exclusive with keyStore, keyStorePassphrase, trustStore, trustStorePassphrase.
# if you set inline properties, values in the properties file location given by 'sslProperties' will be ignored.
#
spring:
rabbitmq:
addresses: my-domain:5672
adminAddresses: http://my-domain:15672
nodes: rabbit@my-domain
username: myusername
password: mypassword
virtual_host: /
useSSL: false
sslProperties:
ssl:
keyStore:
keyStorePassphrase:
trustStore:
trustStorePassphrase:
EDIT 2: It looks like the xd-admin
and xd-container
scripts do not add the messagebus jars to the $CLASSPATH
upon the start of AdminServerApplication
or ContainerServerApplication
. I'm not sure if this is the culprit yet.
The Spring XD startup scripts for xd-admin
and xd-container
do not add any of the messagebus
jars to the classpath before starting their respective applications. They only add jars sitting directly in /lib
and the hadoop jars in lib/hadoop27/
.
I was able to resolve this by modifying the scripts to include the necessary jars on the classpath:
RABBIT_LIB=$APP_HOME/lib/messagebus/rabbit
if [ -d "$RABBIT_LIB" ]; then
for k in "$RABBIT_LIB"/*.jar; do
CLASSPATH="$CLASSPATH":"$k"
done
fi