I have a custom ForkJoinPool created with parallelism of 25.
customForkJoinPool = new ForkJoinPool(25);
I have a list of 700 file names and I used code like this to download the files from S3 in parallel and cast them to Java objects:
customForkJoinPool.submit(() -> {
return fileNames
.parallelStream()
.map((fileName) -> {
Logger log = Logger.getLogger("ForkJoinTest");
long startTime = System.currentTimeMillis();
log.info("Starting job at Thread:" + Thread.currentThread().getName());
MyObject obj = readObjectFromS3(fileName);
long endTime = System.currentTimeMillis();
log.info("completed a job with Latency:" + (endTime - startTime));
return obj;
})
.collect(Collectors.toList);
});
});
When I look at the logs, I see only 5 threads being used. With a parallelism of 25, I expected this to use 25 threads. The average latency to download and convert the file to an object is around 200ms. What am I missing?
May be a better question is how does a parallelstream figure how much to split the original list before creating threads for it? In this case, it looks like it decided to split it 5 times and stop.
Why are you doing this with ForkJoinPool
? It's meant for CPU-bound tasks with subtasks that are too fast to warrant individual scheduling. Your workload is IO-bound and with 200ms latency the individual scheduling overhead is negligible.
Use an Executor
:
import static java.util.stream.Collectors.toList;
import static java.util.concurrent.CompletableFuture.supplyAsync;
ExecutorService threads = Executors.newFixedThreadPool(25);
List<MyObject> result = fileNames.stream()
.map(fn -> supplyAsync(() -> readObjectFromS3(fn), threads))
.collect(toList()).stream()
.map(CompletableFuture::join)
.collect(toList());