I'm trying to use the "new" Streams plugin for RabbitMQ with my spring-cloud-stream project using "functional programming model".
I have set up my application.yaml like this:
spring:
rabbitmq:
listener:
type: stream
stream:
host: ${RABBIT_HOST:localhost}
port: ${RABBIT_PORT:5672}
username: guest
password: guest
name: demo
cloud:
function:
definition: testConsumer
stream:
rabbit:
bindings:
testConsumer-in-0:
consumer:
containerType: stream
bindings:
testConsumer-in-0:
group: demo
destination: test
testProducer-out-0:
destination: test
I have a @PostConstruct method that uses StreamBridge like this:
streamBridge.send("testProducer-out-0", "testing..");
And my testconsumer looks like this:
@Bean
public Consumer<Flux<String>> testConsumer() {
return flux -> flux.doOnEach(LOGGER::info);
}
But when I start my application, I get this exception:
Caused by: com.rabbitmq.stream.StreamException: Could not get response in 10000 ms
And in the log of my RabbitMQ container I get this error:
2022-09-14 13:30:53.485574+00:00 [error] <0.32309.0> {bad_header,<<0,0,1,0,0,17,0,1>>}
If I set spring.cloud.stream.rabbit.bindings.testConsumer-in-0.consumer.containerType to direct, everything works fine.
Does anyone have an idea of why?
It looks like you are trying to connect over the AMQP port, not the stream port.
The stream port is 5552.
Are you mapping the streams port and enabling the plugin? https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-first-application/