rabbitmqspring-integrationspring-integration-dslspring-integration-amqp

How to trigger a functionality after RabbitMQ retry max attempts are over? (Spring Integration - RabbitMQ Listener)


I want to trigger an email, after RabbitMQ Listener retrials are over and still if the process of handle failed.

retry logic is working with below code. But how to trigger the functionality (email trigger) once the max retrial attempts are over.

@Bean
public SimpleMessageListenerContainer container() {
    SimpleMessageListenerContainer container =
            new SimpleMessageListenerContainer(connectionFactory());
    container.setQueues(myQueue());
    container.setDefaultRequeueRejected(false);
    Advice[] adviceArray = new Advice[]{interceptor()};
    container.setAdviceChain(adviceArray);
    return container;
}

@Bean
public IntegrationFlow inboundFlow() {
    return IntegrationFlows.from(
            Amqp.inboundAdapter(container()))
            .log()
            .handle(listenerBeanName, listenerMethodName)
            .get();
}

@Bean
RetryOperationsInterceptor interceptor() {
    return RetryInterceptorBuilder.stateless()
            .maxAttempts(retryMaxAttempts)
            .backOffOptions(initialInterval, multiplier, maxInterval)
             //.recoverer(new RejectAndDontRequeueRecoverer()) 
            .recoverer(new CustomRejectAndRecoverer())
            .build();
}

Adding with code of CustomeRecover

@Service
public class CustomRejectAndRecoverer implements MessageRecoverer {
@Autowired
private EmailGateway emailgateway;

@Override
public void recover(Message message, Throwable cause) {
    // INSERT CODE HERE.... HOW TO CALL GATEWAY
    // emailgateway.sendMail(cause);
    throw new ListenerExecutionFailedException("Retry Policy Exhausted",
            new AmqpRejectAndDontRequeueException(cause), message);
} }

Solution

  • That's exactly what a .recoverer() in that RetryInterceptorBuilder is for.

    You use there now a RejectAndDontRequeueRecoverer, but no body stops you to implement your own MessageRecoverer with the delegation to the RejectAndDontRequeueRecoverer and sending a message to some MessageChannel with a sending emails logic.