spring-amqprabbitmq-stream

Rabbit Streams Resource Declaration


I am wondering what the expected way to declare RabbitMQ resources is. In my code, I define one or more SuperStream beans. The RabbitAdmin doesn't seem to get called - or at least the .initialize() method isn't called, and I don't see any resoures created.

The way I manually trigger the RabbitAdmin to declare the resources is like so:

@Bean
    ApplicationRunner runner(ConnectionFactory cf) {
        return args -> {
            cf.createConnection().close();
            cf.resetConnection();
        };
    }

This works, however it seems that some of my consumers start before these resources are declared. My consumers are created using the AMQP Inbound Adapter code from Spring Integration, like this:

@Bean
IntegrationFlow streamFlow() {
     return IntegrationFlow.from(RabbitStream
        .inboundAdapter(env)
        .superStream("myStream", "myConsumer"))
        // ...
        // endpoints in here
        .get();
}

Is there an expected way to declare resources? It seems like the old SimpleMessageListenercontainer used to kick it off, but not the new StreamListenerContainer.

This question also applies to applications that only produce messages to a RabbitMQ Stream (has no consumers).


Solution

  • Your observation is correct. And we described this behavior in the docs: https://docs.spring.io/spring-amqp/reference/stream.html

    However, this will only work if you are also using non-stream components (such as the SimpleMessageListenerContainer or DirectMessageListenerContainer) because the admin is triggered to declare the defined beans when an AMQP connection is opened. If your application only uses stream components, or you wish to use advanced stream configuration features, you should configure a StreamAdmin instead: