I referenced the documentation: https://dataflow.spring.io/docs/installation/local/manual/ to run the spring cloud data flow in my local mac, and then deployed an http | log
stream according to https://dataflow.spring.io/docs/stream-developer-guides/getting-started/stream/.
But during the "verifying output (local)" step (https://dataflow.spring.io/docs/stream-developer-guides/getting-started/stream/#local), I failed.
I can see there are a lot of connection error in the log:
docker run -d --hostname rabbitmq --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:3.7
java -jar spring-cloud-skipper-server-2.9.2.jar
java -jar spring-cloud-dataflow-server-2.10.2.jar
java -jar spring-cloud-dataflow-shell-2.10.2.jar
curl http://localhost:20100 -H "Content-type: text/plain" -d "Happy streaming"
. The request was successfully sent, but no response body.I can get success response from localhost:20100 like:
The docker containers are as follows:
I can verify the output as suggested by the documentation.
But I didn't get the log. Instead, I get the following:
2023-08-09 03:25:47.057 INFO [http-source,2aa0ce95298cd6b1,db9808ed2bfa4443] 1 --- [oundedElastic-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2023-08-09 03:26:47.705 ERROR [http-source,2aa0ce95298cd6b1,f116c3c77334209a] 1 --- [oundedElastic-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@7a71c2e3]; nested exception is org.springframework.amqp.AmqpIOException: java.net.SocketTimeoutException: connect timed out, failedMessage=GenericMessage [payload=byte[15], headers={content-length=15, http_requestMethod=POST, b3=2aa0ce95298cd6b1-db9808ed2bfa4443-0, nativeHeaders={}, host=localhost:20100, http_requestUrl=http://localhost:20100/, id=1bd16f86-c76a-fb4c-4362-6df88325daf7, contentType=text/plain, user-agent=curl/7.82.0, accept=*/*, timestamp=1691551546926}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1074)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:213)
at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:195)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:88)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:37)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:296)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:277)
at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
at reactor.core.publisher.FluxRefCount$RefCountInner.onNext(FluxRefCount.java:200)
at reactor.core.publisher.FluxPublish$PublishSubscriber.drain(FluxPublish.java:477)
at reactor.core.publisher.FluxPublish$PublishSubscriber.onNext(FluxPublish.java:268)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130)
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
at org.springframework.integration.channel.FluxMessageChannel.tryEmitMessage(FluxMessageChannel.java:82)
at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:71)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.integration.channel.FluxMessageChannel.lambda$subscribeTo$5(FluxMessageChannel.java:123)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
at org.springframework.cloud.sleuth.instrument.reactor.ReactorSleuth.lambda$null$6(ReactorSleuth.java:324)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.springframework.amqp.AmqpIOException: java.net.SocketTimeoutException: connect timed out
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:70)
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:602)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:725)
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:249)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2173)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2146)
at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:1072)
at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.doRabbitSend(AmqpOutboundEndpoint.java:250)
at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.send(AmqpOutboundEndpoint.java:231)
at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.handleRequestMessage(AmqpOutboundEndpoint.java:180)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
... 43 more
Caused by: java.net.SocketTimeoutException: connect timed out
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at java.base/java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source)
at java.base/java.net.AbstractPlainSocketImpl.connect(Unknown Source)
at java.base/java.net.SocksSocketImpl.connect(Unknown Source)
at java.base/java.net.Socket.connect(Unknown Source)
at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:60)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1223)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1173)
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connectAddresses(AbstractConnectionFactory.java:640)
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connect(AbstractConnectionFactory.java:615)
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:565)
... 53 more
Fixed by connecting to the rabbitmq using host IP as it's run by docker.
But I followed the documentation to run it using docker, which lead to connection issues. I think the official document should be updated.
To get host IP, for mac osx you can use
ipconfig getifaddr en0
192.168.1.10
And then configure it into the apps during deploying stream step.
app.http.spring.rabbitmq.host=192.168.1.10
app.log.spring.rabbitmq.host=192.168.1.10