How to properly use SimpleAsyncTaskExecutor when its needed to wait for all virtual threads to finish?
Description I have a Scheduled code that runs every night. This code, makes a bunch of queries to the database and put data to a ConcurrentHashMap. This ConcurrentHashMap contains data that is then used to send via email to users.
Problem
The problem is that the old code runs in SEQUENCE. I want the code to make the queries in parallel and fill the ConcurrentHashMap as they finish. Once they all finish their job, I want to send the emails sequentially.
Current Attempt After a lot of research I figured I should use ExecutorCompletionService with SimpleAsyncTaskExecutor However, Im uncertain how to figure it out when the threads finish their job.
QUESTIONS
Current code
@Component
@Configuration
public class NotificacoesListener {
...
private ExecutorCompletionService getTaskExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setVirtualThreads(true);
taskExecutor.setConcurrencyLimit(4);
ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(taskExecutor);
return executorCompletionService;
}
...
@Scheduled(cron = MySettings.NOTIFICATION_SUB_SYSTEM_EVERY_NIGHT, zone = "GMT-3:00")
@Transactional
public void start() {
System.out.println("Starting at " + Utils.formatDate(LocalDate.now()) + " ...");
...
ConcurrentHashMap<User, String> map = new ConcurrentHashMap<>();
ExecutorCompletionService executorCompletionService = getTaskExecutor();
int virtualThreadsRunning = 0;
//
for (Verifica instancia : instancias) {
virtualThreadsRunning++;
executorCompletionService.submit(new Callable() {
@Override
public Object call() throws Exception {
instancia.verifica(tenants, map);
return true;
}
});
}
while (virtualThreadsRunning > 0) {
try {
executorCompletionService.take();
} catch (InterruptedException ex) {
ex.printStackTrace();
} finally {
virtualThreadsRunning--;
}
}
sendEmails(map);
System.out.println("END");
}
}
Your design is correct in a sense that it 1) implements parallelization of instancia.verifica
calls, 2) waits for completion of all submitted tasks, and 3) restricts the number of parallel threads, but might look a bit overengineered.
The simplest way is to use plain old ThreadPoolExecutor
with fixed number of threads and wait for its completion:
final ExecutorService executorCompletionService = Executors.newFixedThreadPool(4);
for (Verifica instancia : instancias) {
executorCompletionService.submit(new Callable() {
@Override
public Object call() throws Exception {
instancia.verifica(tenants, map);
return true;
}
});
}
executorCompletionService.shutdown();
executorCompletionService.awaitTermination( /* wait for awhile*/);
This solution does not use virtual threads, but I don't think you really need them for the scenario you described (nightly cron, long running I/O intensive queries). If you still need them, you could use instead
Executors.newFixedThreadPool(4, Thread.ofVirtual().factory());
Pooling of virtual threads is not recommended, but my answer to In java How to migrate from Executors.newFixedThreadPool(MAX_THREAD_COUNT()) to Virtual Thread thread argues that there is no harm in doing this.
The recommended way of using virtual threads and to limit concurrency is to use Semaphore
. In your case it might look like
final Thread[] threads = new Thread[...];
final Semaphore sem = new Semaphore(4);
for (final Verifica instancia : instancias) {
threads[i] = Thread.ofVirtual().start( () -> {
try {
sem.acquire();
instancia.verifica(tenants, map);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
sem.release();
}
});
}
for (final Thread thread : threads) {
thread.join();
}
From the synchronization standpoint all above solutions might be a bit more performant because your solution uses two extra synchronizations: the BlockingQueue
of ExecutorCompletionService
and synchronized
block on handling concurrency limit in SimpleAsyncTaskExecutor
, but, like I said, in your scenario the performance difference will most likely be microscopic.