javaasynchronousspring-integrationspring-integration-aws

Spring Integration - Async service activator for SQS messages


I am trying to implement an Integration flow for a sqs queue using a void async service activator but the handling logic is never triggered. The message is received in the flow, succesfuly converted by my custom transformer but the async handling is never completed.

This is my configuration class:

@Configuration
public class SqsConfiguration {
    /**
     ...
     ...
    **/


    @Bean("amazonSQSClientConfiguration")
    ClientConfiguration getAmazonSQSClientConfiguration() {
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setConnectionTimeout(connectionTimeout);
        clientConfiguration.setMaxConnections(maxConnections);
        clientConfiguration.setSocketTimeout(socketTimeout);
        clientConfiguration.setMaxConsecutiveRetriesBeforeThrottling(maxConsecutiveRetriesBeforeThrottling);
        return clientConfiguration;
    }

    @Bean("amazonSQSAsync")
    AmazonSQSAsync getAmazonSQSAsync() {
        return AmazonSQSAsyncClientBuilder.standard()
                .withClientConfiguration(getAmazonSQSClientConfiguration())
                .withRegion(this.region)
                .build();
    }

    @Bean("amazonSQSRequestListenerContainerConsumerPool")
    protected ThreadPoolTaskExecutor amazonSQSRequestListenerContainerConsumerPool() {
        int maxSize = (int) Math.round(concurrentHandlers * poolSizeFactor);
        int queueCapacity = (int) Math.round(concurrentHandlers * poolQueueSizeFactor);
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(concurrentHandlers);
        taskExecutor.setMaxPoolSize(maxSize);
        taskExecutor.setKeepAliveSeconds(poolKeepAliveTimeSeconds);
        taskExecutor.setQueueCapacity(queueCapacity);
        taskExecutor.setThreadFactory(new NamedDaemonThreadFactory("AmazonSQSRequestHandler"));
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        log.info(
                String.format(
                        "Amazon SQS request handler pool settings: {coreSize: %d, maxSize: %d, queueCapacity: %d}",
                        concurrentHandlers,
                        maxSize,
                        queueCapacity
                )
        );
        return taskExecutor;
    }

    @Bean("sqsMessageDrivenChannelAdapter")
    public MessageProducerSupport sqsMessageDrivenChannelAdapter() {
        SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(getAmazonSQSAsync(), this.queueName);
        adapter.setMaxNumberOfMessages(this.maxNumberOfMessages);
        adapter.setVisibilityTimeout(this.visibilityTimeout);
        adapter.setSendTimeout(this.sendTimeout);
        adapter.setWaitTimeOut(this.waitTimeOut);
        adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.ON_SUCCESS);
        adapter.setTaskExecutor(amazonSQSRequestListenerContainerConsumerPool());
        return adapter;
    }


    @Bean
    @SuppressWarnings("unchecked")
    IntegrationFlow sqsRequestIntegrationFlow() {
        SqsEventHandlerDispatcher commandHandler = applicationContext.getBean(SqsEventHandlerDispatcher.class);
        return IntegrationFlows.from(sqsMessageDrivenChannelAdapter())
                .transform(converter::toEvent)
                .log()
                .handle(commandHandler, "handle", a -> a.async(true))
                .log()
                .get();
    }
}

This is my handler:

@Slf4j
@Component
@MessageEndpoint
public class SqsEventHandlerDispatcher {
    /**
     ...
     ...
    **/


    public ListenableFuture<?> handle(EventMessage event) {

        return new ListenableFutureTask<Void>(() -> doHandle(event), null);
    }

    private void doHandle(EventMessage event) {
         //my handling logic 

    }
}

The logic in doHandle() method is never reached.

Same integration flow with a sync handler which will return void works perfectly:

 @Bean
    @SuppressWarnings("unchecked")
    IntegrationFlow sqsRequestIntegrationFlow() {
        SqsEventHandlerDispatcher commandHandler = applicationContext.getBean(SqsEventHandlerDispatcher.class);
        return IntegrationFlows.from(sqsMessageDrivenChannelAdapter())
                .transform(converter::toEvent)
                .log()
                .handle(commandHandler, "handle")
                .log()
                .get();
    }

===============================================================================
@Slf4j
@Component
@MessageEndpoint
public class SqsEventHandlerDispatcher {

public void handle(EventMessage event) {
         //my handling logic
    }

}

Am I missing something? Or can I achieve it by using Mono? I don't have much experience neither with spring integration nor async processing.


Solution

  • I found a solution using reactive java. This is how my service activator looks now:

     public Mono handle(EventMessage event, @Header(AwsHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
            return Mono.fromRunnable(() -> doHandle(event)).subscribeOn(Schedulers.elastic())
                    .doOnSuccess(r -> {
                        log.trace("Message successfully processed. Will delete it now!");
                        acknowledgment.acknowledge();
                    });
        }
    
        private void doHandle(EventMessage event) {
           //my handling logic
        }
    

    I ve also updated the sqs message deletion policy to NEVER and will manually acknowledge when a message was successfully processed and can be deleted.

    adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.NEVER);