amazon-web-servicesspring-integrationamazon-sqsspring-cloud-awsspring-integration-aws

Spring Integration + SQS - retry on exception doesn't work


I'm working on integrating Spring Integration with AWS SQS queue.

I have an issue when my method annotated with @ServiceActivator throws an exception. It seems that in such cases the message is removed from the queue anyway. I've configured MessageDeletionPolicy to ON_SUCCESS in SqsMessageDrivenChannelAdapter.

Here is my channel/adapter configuration https://github.com/sdusza1/spring-integration-sqs/blob/master/src/main/java/com/example/demo/ChannelConfig.java

I've tried doing the same using @SqsListener annotation and messages are not deleted as expected.

I've created a mini Spring Boot app here to demonstrate this issue: https://github.com/sdusza1/spring-integration-sqs

Please help :)


Solution

  • Your configuration is like this:

    @Bean
    public MessageProducerSupport sqsMessageDrivenChannelAdapter() {
        SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(amazonSqs, SQS_QUEUE_NAME);
        adapter.setOutputChannel(inboundChannel());
        adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.ON_SUCCESS);
        adapter.setVisibilityTimeout(RETRY_NOTIFICATION_AFTER);
        return adapter;
    

    }

    Where the inboundChannel is like this:

     @Bean
        public QueueChannel inboundChannel() {
            return new QueueChannel();
     }
    

    So, this is a queue, therefore async and the message from that queue is processed on a separate thread by the TaskScheduler which polls this kind of channel according your PollerMetadata configuration. In this case any errors in the consumer are thrown into that thread as well and don't reach the SqsMessageDrivenChannelAdapter for expected error handling.

    This technically is fully different from your @SqsListener experience which is really called directly on the container thread, and therefore its error handling is applied.

    Or you need to revise your logic how you would like to handle errors in that separate thread or just don't use a QueueChannel just after SqsMessageDrivenChannelAdapter and let it throw and handle errors in the underlying SQS Listener Container as it is in case of @SqsListener.