So I recently ran a benchmark where I compared the performance of nested streams in 3 cases:
parallelStream
) - this effectively tests `ForkJoinPool.commonPool()ForkJoinPool
for each taskHere's the benchmark code (I've used JMH):
public class NestedPerf {
@State(Scope.Benchmark)
public static class StateData{
public static final List<Integer> outerLoop = IntStream.range(0, 32).boxed().toList();
public static final List<Integer> innerLoop = IntStream.range(0, 32).boxed().toList();
}
private static void runInNewPool(Runnable task) {
ForkJoinPool pool = new ForkJoinPool();
try {
pool.submit(task).join();
} finally {
pool.shutdown();
}
}
private static void innerParallelLoop() {
StateData.innerLoop.parallelStream().unordered().forEach(i -> {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
private static void innerSequentialLoop() {
StateData.innerLoop.stream().unordered().forEach(i -> {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
@Benchmark
public void testingNewPool(Blackhole bh){
StateData.outerLoop.parallelStream().unordered().forEach(i -> {
runInNewPool(ParallelPerf::innerParallelLoop);
bh.consume(i);
});
}
@Benchmark
public void testingCommonPoolWithSequentialInner(Blackhole bh){
StateData.outerLoop.parallelStream().unordered().forEach(i -> {
innerSequentialLoop();
bh.consume(i);
});
}
@Benchmark
public void testingCommonPool(Blackhole bh){
StateData.outerLoop.parallelStream().unordered().forEach(i -> {
innerParallelLoop();
bh.consume(i);
});
}
}
And here is the output:
Benchmark Mode Cnt Score Error Units
NestedPerf.testingCommonPool thrpt 25 1.935 ± 0.005 ops/s
NestedPerf.testingCommonPoolWithSequentialInner thrpt 25 1.744 ± 0.007 ops/s
NestedPerf.testingNewPool thrpt 25 22.648 ± 0.559 ops/s
The difference between the method with new Pools vs the method with commonPool is surprising. Does anyone have an idea as to why creating new pools makes things around 20x faster for this benchmark ?
If it helps, I'm running this on a Core i7 10850H system with 12 available CPUs (hexcore + hyperthreading).
Your tasks are simply a call to Thread::sleep
. That blocks the calling thread, which means the OS will not schedule the thread for execution until the specified duration elapses. This leaves the CPU free to execute any other threads. In other words, your tasks are not CPU-bound and thus do not burden the CPU. Which means throwing more threads at your set of tasks is going to increase throughput without overwhelming the CPU.
By using multiple fork-join pools, you are effectively increasing the number of threads available to execute your tasks. It's not much different from simply increasing the number of threads in a single pool. Whether you have 1 pool with 15 threads or 3 pools with 5 threads each, you still end up with a total of 15 threads.
Let's say you have 10 tasks that each sleep for 5 milliseconds. If you have 5 threads to execute those tasks, then you'll roughly see:
Start 5 tasks => Wait 5 ms => Start 5 tasks => Wait 5 ms => Done!
But if you have 10 threads you'll roughly see:
Start 10 tasks => Wait 5 ms => Done!
The first takes a total of 10 milliseconds to execute every task, the second only takes 5 milliseconds. And that's basically where the increased throughput is coming from in your tests.
All that said, a ForkJoinPool
has a set level of parallelism. One way it tries to maintain this parallelism is by spawning a new thread (if the maximum number of threads hasn't already been reached) when one of its threads is blocked. From the documentation:
[A
ForkJoinPool
] attempts to maintain enough active (or available) threads by dynamically adding, suspending, or resuming internal worker threads, even if some tasks are stalled waiting to join others. However, no such adjustments are guaranteed in the face of blocked I/O or other unmanaged synchronization. The nestedForkJoinPool.ManagedBlocker
interface enables extension of the kinds of synchronization accommodated.
You're calling Thread::sleep
in an unmanaged way. In other words, you're blocking the threads of the pool in such a way that the pool cannot compensate. To prevent that, consider using a ManagedBlocker
. Here's an example implementation:
import java.time.Duration;
import java.util.concurrent.ForkJoinPool;
public class SleepManagedBlocker implements ForkJoinPool.ManagedBlocker {
private final Duration sleepDuration;
private boolean slept; // Does this need to be volatile?
public SleepManagedBlocker(Duration slepDuration) {
this.sleepDuration = slepDuration;
}
@Override
public boolean block() throws InterruptedException {
if (!slept) {
slept = true;
Thread.sleep(sleepDuration);
}
return slept;
}
@Override
public boolean isReleasable() {
return slept;
}
}
Then you would replace the Thread.sleep(5)
calls with:
ForkJoinPool.managedBlock(new SleepManagedBlocker(Duration.ofMillis(5)))
You should see similar throughput increases in your tests without needing to using multiple fork-join pools.
Here is a benchmark showing the effect of using ManagedBlocker
in this case. It was compiled and executed on Java 23.
import java.time.Duration;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
@Fork(value = 1, jvmArgsAppend = {"-Djava.util.concurrent.ForkJoinPool.common.maximumSpares=1024"})
@Warmup(iterations = 5)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public class FJPBenchmarks {
@Benchmark
public void runTest(TestState state, Blackhole bh) {
state.executeOuterLoop(bh);
}
@State(Scope.Benchmark)
public static class TestState {
private static final Duration SLEEP_DURATION = Duration.ofMillis(5);
private static final int OUTER_LOOP_COUNT = 32;
private static final int INNER_LOOP_COUNT = 32;
@Param({"sequential", "parallel"})
private String sequentialMode;
@Param({"common", "separate"})
private String poolMode;
@Param({"raw", "managed"})
private String sleepMode;
void executeOuterLoop(Blackhole bh) {
IntStream.range(0, OUTER_LOOP_COUNT)
.unordered()
.parallel()
.forEach(i -> {
executeInnerLoop(createInnerLoop());
bh.consume(i);
});
}
IntStream createInnerLoop() {
var stream = IntStream.range(0, INNER_LOOP_COUNT).unordered();
return switch (sequentialMode) {
case "sequential" -> stream.sequential();
case "parallel" -> stream.parallel();
default -> throw new IllegalStateException("bad sequentialMode: " + sequentialMode);
};
}
void executeInnerLoop(IntStream loop) {
var sleeper = getSleeper();
switch (poolMode) {
case "common" -> loop.forEach(_ -> sleeper.sleepUnchecked());
case "separate" -> {
try (var pool = new ForkJoinPool()) {
loop.forEach(_ -> pool.submit(sleeper::sleepUnchecked).join());
}
}
default -> throw new IllegalStateException("bad poolMode: " + poolMode);
}
}
Sleeper getSleeper() {
return switch (sleepMode) {
case "raw" -> () -> Thread.sleep(SLEEP_DURATION);
case "managed" -> () -> ForkJoinPool.managedBlock(new SleepManagedBlocker());
default -> throw new IllegalStateException("bad sleepMode: " + sleepMode);
};
}
@FunctionalInterface
interface Sleeper {
void sleep() throws InterruptedException;
default Void sleepUnchecked() {
try {
sleep();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
return null;
}
}
static class SleepManagedBlocker implements ForkJoinPool.ManagedBlocker {
private boolean slept;
@Override
public boolean block() throws InterruptedException {
if (!slept) {
slept = true;
Thread.sleep(SLEEP_DURATION);
}
return true;
}
@Override
public boolean isReleasable() {
return slept;
}
}
}
}
Results (from executing the benchmark on a computer with 8 processors):
Benchmark (poolMode) (sequentialMode) (sleepMode) Mode Cnt Score Error Units
FJPBenchmarks.runTest common sequential raw thrpt 5 1.463 � 0.022 ops/s
FJPBenchmarks.runTest common sequential managed thrpt 5 5.858 � 0.026 ops/s
FJPBenchmarks.runTest common parallel raw thrpt 5 1.454 � 0.044 ops/s
FJPBenchmarks.runTest common parallel managed thrpt 5 35.997 � 0.234 ops/s
FJPBenchmarks.runTest separate sequential raw thrpt 5 1.426 � 0.325 ops/s
FJPBenchmarks.runTest separate sequential managed thrpt 5 1.348 � 0.157 ops/s
FJPBenchmarks.runTest separate parallel raw thrpt 5 13.505 � 1.175 ops/s
FJPBenchmarks.runTest separate parallel managed thrpt 5 16.864 � 0.186 ops/s