springspring-integration

PollableChannel appears to be synchronous by default


Using spring integration I'm sending messages via a QueueChannel from a method annotated with Spring's @ScheduledService going to a method annotated with @ServiceActivator. I expected the @ServiceActivator annotated method to execute in a separate thread by default but it appears not to be the case.

@Configuration
public class SpecificIntegrationConfig {

    @Bean
    public MessageChannel dirListingChannel() {
        return new QueueChannel(5);
    }
}

I send the message using a MessageGateway:

@MessagingGateway
public interface MessageGateway {

    @Gateway(requestChannel = "dirListingChannel")
    void sendListing(List<DirEntry> entries);
}

The sending of messages (a listing of files in a remote directory):

@RequiredArgsConstructor
public class SftpPollerComponent {
    private final SftpClient sftpClient;
    private final MessageGateway messageGateway;
    @Value("${remote.dir}")
    private final String dir;

    @Scheduled(fixedDelay = 2_000)
    public void pollRemote() throws IOException {
        final ArrayList<SftpClient.DirEntry> dirEntries = new ArrayList<>(20);
        for (final SftpClient.DirEntry entry : this.sftpClient.readDir(this.dir)) {
            dirEntries.add(entry);
        }
        log.info("Sender thread: {}", Thread.currentThread().getName());
        this.messageGateway.sendListing(dirEntries);
    }

}

And receiving of the messages:

@Component
public class DirEntryService {

    @ServiceActivator(inputChannel = "dirListingChannel")
    public void handleFiles(final List<DirEntry> files) throws InterruptedException {
        log.info("Receiving thread name: {}", Thread.currentThread().getName());
        log.info("Sleeping..");
        Thread.sleep(6000);
        log.info("Service awake.");
    }
}

What I see in the logging is that the receiving method executes on the same thread as the sender/@Scheduled method and that the enforced Thread.sleep(6000) in the receiver blocks the sender. i.e. it would appear as though the whole process is synchronous.

2024-11-28T17:45:31.503Z  INFO 12987 --- [   scheduling-1] c.e.i.i.component.SftpPollerComponent    : Sender thread: scheduling-1
2024-11-28T17:45:32.478Z  INFO 12987 --- [   scheduling-1] c.e.i.ingest.service.DirEntryService     : Receiving thread name: scheduling-1
2024-11-28T17:45:32.478Z  INFO 12987 --- [   scheduling-1] c.e.i.ingest.service.DirEntryService     : There are 10 files
2024-11-28T17:45:32.479Z  INFO 12987 --- [   scheduling-1] c.e.i.ingest.service.DirEntryService     : Sleeping..
2024-11-28T17:45:38.479Z  INFO 12987 --- [   scheduling-1] c.e.i.ingest.service.DirEntryService     : Service awake.
2024-11-28T17:45:39.496Z  INFO 12987 --- [   scheduling-1] c.e.i.i.component.SftpPollerComponent    : Sender thread: scheduling-1
2024-11-28T17:45:40.480Z  INFO 12987 --- [   scheduling-1] c.e.i.ingest.service.DirEntryService     : Receiving thread name: scheduling-1
2024-11-28T17:45:40.481Z  INFO 12987 --- [   scheduling-1] c.e.i.ingest.service.DirEntryService     : There are 10 files
2024-11-28T17:45:40.481Z  INFO 12987 --- [   scheduling-1] c.e.i.ingest.service.DirEntryService     : Sleeping..
2024-11-28T17:45:46.481Z  INFO 12987 --- [   scheduling-1] c.e.i.ingest.service.DirEntryService     : Service awake.
2024-11-28T17:45:47.497Z  INFO 12987 --- [   scheduling-1] c.e.i.i.component.SftpPollerComponent    : Sender thread: scheduling-1
2024-11-28T17:45:48.482Z  INFO 12987 --- [   scheduling-1] c.e.i.ingest.service.DirEntryService     : Receiving thread name: scheduling-1
2024-11-28T17:45:48.482Z  INFO 12987 --- [   scheduling-1] c.e.i.ingest.service.DirEntryService     : There are 10 files
2024-11-28T17:45:48.482Z  INFO 12987 --- [   scheduling-1] c.e.i.ingest.service.DirEntryService     : Sleeping..

I had assumed that by specifying a QueueChannel by default Spring would automatically schedule polling of the queue and use another thread. Also according to Spring's own docs:

If you want the polling to be asynchronous, a poller can optionally specify a task-executor attribute that points to an existing instance of any TaskExecutor bean

This seems to indicate that the behaviour I have observed is the default which seems to me to be contrary to the idea of a having pollable queue if by default the receive is going to block the sender.


Solution

  • The @Scheduled and AbstractPollingEndpoint use the same shared ThreadPoolTaskScheduler from Spring Boot auto-configuration. And that one comes with 1 thread by default. That's why you always see only that scheduling-1 name. Plus, if you block this thread, everything scheduled is going to be blocked as well. See more info in docs:

    https://docs.spring.io/spring-boot/reference/features/task-execution-and-scheduling.html

    https://docs.spring.io/spring-boot/reference/messaging/spring-integration.html

    I believe it has been made 1 by default because purpose of Spring Boot is to develop microservices. So, we go with minimal resources from the start and you are free adjust as that is necessary.