I have 10 rabbitMQ queues, called event.q.0, event.q.2, <...>, event.q.9. Each of these queues receive messages routed from event.consistent-hash exchange. I want to build a fault tolerant solution that will consume messages for a specific event in sequential manner, since ordering is important. For this I have set up a flow that listens to those queues and routes messages based on event ID to a specific worker flow. Worker flows work based on queue channels so that should guarantee the FIFO order for an event with specific ID. I have come up with with the following set up:
public IntegrationFlow eventConsumerFlow(RabbitTemplate rabbitTemplate, Advice retryAdvice) {
return IntegrationFlows
Amqp.inboundAdapter(new SimpleMessageListenerContainer(rabbitTemplate.getConnectionFactory()))
.configureContainer(c -> c
.<Event, String>route(e -> String.format("worker-input-%d", e.getId() % numberOfWorkers))
private Advice deadLetterAdvice() {
return RetryInterceptorBuilder
private ExponentialBackOffPolicy backOffPolicy() {
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
return backOffPolicy;
private MessageRecoverer recoverer() {
return new RepublishMessageRecoverer(
public void init() {
for (int i = 0; i < numberOfWorkers; i++) {
flowContext.registration(workerFlow(MessageChannels.queue(String.format("worker-input-%d", i), queueCapacity).get()))
.id(String.format("worker-flow-%d", i))
private IntegrationFlow workerFlow(QueueChannel channel) {
return IntegrationFlows
.<Object, Class<?>>route(Object::getClass, m -> m
.subFlowMapping(EventOne.class, s -> s.handle(oneHandler))
.subFlowMapping(EventTwo.class, s -> s.handle(anotherHandler))
Now, when lets say an error happens in eventConsumerFlow
, the retry mechanism works as expected, but when an error happens in workerFlow
, the retry doesn't work anymore and the message doesn't get sent to dead letter exchange. I assume this is because once message is handed off to QueueChannel, it gets acknowledged automatically. How can I make the retry mechanism work in workerFlow
as well, so that if exception happens there, it could retry a couple of times and send a message to DLX when tries are exhausted?
If you want resiliency, you shouldn't be using queue channels at all; the messages will be acknowledged immediately after the message is put in the in-memory queue;if the server crashes, those messages will be lost.
You should configure a separate adapter for each queue if you want no message loss.
That said, to answer the general question, any errors on downstream flows (including after a queue channel) will be sent to the errorChannel
defined on the inbound adapter.