rabbitmqspring-cloud-streamspring-cloud-stream-binder

Spring Cloud Stream with RabbitMQ Streams


I'm trying to use the "new" Streams plugin for RabbitMQ with my spring-cloud-stream project using "functional programming model".

I have set up my application.yaml like this:

spring:
  rabbitmq:
    listener:
      type: stream
    stream:
      host: ${RABBIT_HOST:localhost}
      port: ${RABBIT_PORT:5672}
      username: guest
      password: guest
      name: demo
  cloud:
    function:
      definition: testConsumer
    stream:
      rabbit:
        bindings:
          testConsumer-in-0:
            consumer:
              containerType: stream
      bindings:
        testConsumer-in-0:
          group: demo
          destination: test
        testProducer-out-0:
          destination: test

I have a @PostConstruct method that uses StreamBridge like this:

streamBridge.send("testProducer-out-0", "testing..");

And my testconsumer looks like this:

@Bean
public Consumer<Flux<String>> testConsumer() {
    return flux -> flux.doOnEach(LOGGER::info);
}

But when I start my application, I get this exception:

Caused by: com.rabbitmq.stream.StreamException: Could not get response in 10000 ms

And in the log of my RabbitMQ container I get this error:

2022-09-14 13:30:53.485574+00:00 [error] <0.32309.0> {bad_header,<<0,0,1,0,0,17,0,1>>}

If I set spring.cloud.stream.rabbit.bindings.testConsumer-in-0.consumer.containerType to direct, everything works fine.

Does anyone have an idea of why?


Solution

  • It looks like you are trying to connect over the AMQP port, not the stream port.

    The stream port is 5552.

    Are you mapping the streams port and enabling the plugin? https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-first-application/