javaspring-integrationspring-cloud-awsspring-integration-aws

ThreadPoolTaskExecutor with just one thread on pool not processing messages from AWS queue


I've created an on demand ChannelAdapter, AsyncTaskExecutor and a Channel for every queue registered on the application. I noticed that when the number of maxPoolSize of the AsyncTaskExecutor is equal to one, the messages are not being processed. This is how the AsyncTaskExecutor bean is created.

 static void registerAsyncTaskExecutor(final Consumer consumer, final GenericApplicationContext registry) {
        final TaskExecutor executor = consumer.getExecutor();

        final BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(ThreadPoolTaskExecutor.class);
        builder.addPropertyValue("corePoolSize", executor.getCorePoolSize());
        builder.addPropertyValue("maxPoolSize", executor.getMaxPoolSize());
        builder.addPropertyValue("threadNamePrefix", consumer.getName() + "-");

        final String beanName = executor.getName();
        final BeanDefinition beanDefinition = builder.getBeanDefinition();
        registry.registerBeanDefinition(beanName, beanDefinition);
    }

Another thing that I noticed is when this method is called java.util.concurrent.ThreadPoolExecutor#execute this condition workerCountOf(c) < corePoolSize is always false. The full project link is over here https://github.com/LeoFuso/spring-integration-aws-demo


Solution

  • It is always bad practice to to provide a thread pool just with one thread to some manageable component. You may not know what that component is going to do with your thread pool and it is really could be a fact that your single thread is taken by some long-living task internally and all new tasks are just going to stall in the queue waiting for that single thread to be free, which is might not going to happen.

    In fact that is really what we have with the AsynchronousMessageListener from Spring Cloud AWS which is used by the mentioned SqsMessageDrivenChannelAdapter:

    public void run() {
            while (isQueueRunning()) {
    

    So, or rely on the the default executor or provide enough threads into your own.

    Looks like the logic over there is like this for the number of threads:

        int spinningThreads = this.getRegisteredQueues().size();
    
        if (spinningThreads > 0) {
            threadPoolTaskExecutor
                    .setCorePoolSize(spinningThreads * DEFAULT_WORKER_THREADS);
    

    So, we have the exact number of thread as we provide SQS queue, plus 2 multiplier for workers. Looks like we need a thread for each queue to poll and extra thread to process messages from them.

    (Not Spring Integration question though - more like Spring Cloud AWS).