Using spring integration I'm sending messages via a QueueChannel
from a method annotated with Spring's @ScheduledService
going to a method annotated with @ServiceActivator
. I expected the @ServiceActivator
annotated method to execute in a separate thread by default but it appears not to be the case.
@Configuration
public class SpecificIntegrationConfig {
@Bean
public MessageChannel dirListingChannel() {
return new QueueChannel(5);
}
}
I send the message using a MessageGateway
:
@MessagingGateway
public interface MessageGateway {
@Gateway(requestChannel = "dirListingChannel")
void sendListing(List<DirEntry> entries);
}
The sending of messages (a listing of files in a remote directory):
@RequiredArgsConstructor
public class SftpPollerComponent {
private final SftpClient sftpClient;
private final MessageGateway messageGateway;
@Value("${remote.dir}")
private final String dir;
@Scheduled(fixedDelay = 2_000)
public void pollRemote() throws IOException {
final ArrayList<SftpClient.DirEntry> dirEntries = new ArrayList<>(20);
for (final SftpClient.DirEntry entry : this.sftpClient.readDir(this.dir)) {
dirEntries.add(entry);
}
log.info("Sender thread: {}", Thread.currentThread().getName());
this.messageGateway.sendListing(dirEntries);
}
}
And receiving of the messages:
@Component
public class DirEntryService {
@ServiceActivator(inputChannel = "dirListingChannel")
public void handleFiles(final List<DirEntry> files) throws InterruptedException {
log.info("Receiving thread name: {}", Thread.currentThread().getName());
log.info("Sleeping..");
Thread.sleep(6000);
log.info("Service awake.");
}
}
What I see in the logging is that the receiving method executes on the same thread as the sender/@Scheduled
method and that the enforced Thread.sleep(6000)
in the receiver blocks the sender. i.e. it would appear as though the whole process is synchronous.
2024-11-28T17:45:31.503Z INFO 12987 --- [ scheduling-1] c.e.i.i.component.SftpPollerComponent : Sender thread: scheduling-1
2024-11-28T17:45:32.478Z INFO 12987 --- [ scheduling-1] c.e.i.ingest.service.DirEntryService : Receiving thread name: scheduling-1
2024-11-28T17:45:32.478Z INFO 12987 --- [ scheduling-1] c.e.i.ingest.service.DirEntryService : There are 10 files
2024-11-28T17:45:32.479Z INFO 12987 --- [ scheduling-1] c.e.i.ingest.service.DirEntryService : Sleeping..
2024-11-28T17:45:38.479Z INFO 12987 --- [ scheduling-1] c.e.i.ingest.service.DirEntryService : Service awake.
2024-11-28T17:45:39.496Z INFO 12987 --- [ scheduling-1] c.e.i.i.component.SftpPollerComponent : Sender thread: scheduling-1
2024-11-28T17:45:40.480Z INFO 12987 --- [ scheduling-1] c.e.i.ingest.service.DirEntryService : Receiving thread name: scheduling-1
2024-11-28T17:45:40.481Z INFO 12987 --- [ scheduling-1] c.e.i.ingest.service.DirEntryService : There are 10 files
2024-11-28T17:45:40.481Z INFO 12987 --- [ scheduling-1] c.e.i.ingest.service.DirEntryService : Sleeping..
2024-11-28T17:45:46.481Z INFO 12987 --- [ scheduling-1] c.e.i.ingest.service.DirEntryService : Service awake.
2024-11-28T17:45:47.497Z INFO 12987 --- [ scheduling-1] c.e.i.i.component.SftpPollerComponent : Sender thread: scheduling-1
2024-11-28T17:45:48.482Z INFO 12987 --- [ scheduling-1] c.e.i.ingest.service.DirEntryService : Receiving thread name: scheduling-1
2024-11-28T17:45:48.482Z INFO 12987 --- [ scheduling-1] c.e.i.ingest.service.DirEntryService : There are 10 files
2024-11-28T17:45:48.482Z INFO 12987 --- [ scheduling-1] c.e.i.ingest.service.DirEntryService : Sleeping..
I had assumed that by specifying a QueueChannel
by default Spring would automatically schedule polling of the queue and use another thread. Also according to Spring's own docs:
If you want the polling to be asynchronous, a poller can optionally specify a task-executor attribute that points to an existing instance of any TaskExecutor bean
This seems to indicate that the behaviour I have observed is the default which seems to me to be contrary to the idea of a having pollable queue if by default the receive is going to block the sender.
The @Scheduled
and AbstractPollingEndpoint
use the same shared ThreadPoolTaskScheduler
from Spring Boot auto-configuration. And that one comes with 1
thread by default. That's why you always see only that scheduling-1
name. Plus, if you block this thread, everything scheduled is going to be blocked as well. See more info in docs:
https://docs.spring.io/spring-boot/reference/features/task-execution-and-scheduling.html
https://docs.spring.io/spring-boot/reference/messaging/spring-integration.html
I believe it has been made 1
by default because purpose of Spring Boot is to develop microservices. So, we go with minimal resources from the start and you are free adjust as that is necessary.