I am wondering what the expected way to declare RabbitMQ resources is. In my code, I define one or more SuperStream
beans. The RabbitAdmin doesn't seem to get called - or at least the .initialize()
method isn't called, and I don't see any resoures created.
The way I manually trigger the RabbitAdmin to declare the resources is like so:
@Bean
ApplicationRunner runner(ConnectionFactory cf) {
return args -> {
cf.createConnection().close();
cf.resetConnection();
};
}
This works, however it seems that some of my consumers start before these resources are declared. My consumers are created using the AMQP Inbound Adapter code from Spring Integration, like this:
@Bean
IntegrationFlow streamFlow() {
return IntegrationFlow.from(RabbitStream
.inboundAdapter(env)
.superStream("myStream", "myConsumer"))
// ...
// endpoints in here
.get();
}
Is there an expected way to declare resources? It seems like the old SimpleMessageListenercontainer
used to kick it off, but not the new StreamListenerContainer
.
This question also applies to applications that only produce messages to a RabbitMQ Stream (has no consumers).
Your observation is correct. And we described this behavior in the docs: https://docs.spring.io/spring-amqp/reference/stream.html
However, this will only work if you are also using non-stream components (such as the
SimpleMessageListenerContainer
orDirectMessageListenerContainer
) because the admin is triggered to declare the defined beans when an AMQP connection is opened. If your application only uses stream components, or you wish to use advanced stream configuration features, you should configure aStreamAdmin
instead: