This is a follow-up from this post: Why are inner parallel streams faster with new pools than with the commonPool for this scenario?
Apologies in advance for the wall of text. This is for JDK 17.
I am currently testing methods with these scenarios:
parallelStream
, and an inner nested parallelStream
that uses ManagedBlocker
to call Thread::sleep
parallelStream
, and an inner nested parallelStream
that uses a different FJPool, but also uses ManagedBlocker
to call Thread::sleep
Here's how the code looks:
public class NestedPerf {
private static final ForkJoinPool sharedInnerPool = new ForkJoinPool();
public static void main(String[] args){
// testInnerParallelLoopWithSharedPoolAndManagedBlock(10000);
// testInnerParallelLoopWithManagedBlock(2);
}
public static void testInnerParallelLoopWithSharedPoolAndManagedBlock(int limit){
Map<Integer, Integer> threads = new ConcurrentHashMap<>();
boolean threwException = false;
for(int c = 0; c < limit; c++) {
try {
StateData.outerLoop.parallelStream().unordered().forEach(i -> {
sharedInnerPool.submit(() -> StateData.innerLoop.parallelStream().unordered().forEach(j -> {
try {
ForkJoinPool.managedBlock(new SleepManagedBlocker(Duration.ofMillis(5)));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
})).join();
int count = sharedInnerPool.getActiveThreadCount();
threads.put(count, count);
});
} catch (Exception e) {
System.out.println(e.getMessage());
threwException = true;
int count = ForkJoinPool.commonPool().getActiveThreadCount();
threads.clear();
threads.put(count, count);
break;
}
}
if(threwException){
System.out.println("Exception thrown. Max Thread Count = "+ threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
} else {
System.out.println( "Max Thread Count = " +threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
}
}
public static void testInnerParallelLoopWithManagedBlock(int limit){
Map<Integer, Integer> threads = new ConcurrentHashMap<>();
boolean threwException = false;
for(int c = 0; c < limit; c++) {
try {
StateData.outerLoop.parallelStream().unordered().forEach(i -> {
innerParallelLoopWithManagedBlock();
int count = ForkJoinPool.commonPool().getActiveThreadCount();
threads.put(count, count);
});
} catch (Exception e) {
System.out.println(e.getMessage());
threwException = true;
int count = ForkJoinPool.commonPool().getActiveThreadCount();
threads.clear();
threads.put(count, count);
break;
}
}
if(threwException){
System.out.println("Exception thrown. Max Thread Count = "+ threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
} else {
System.out.println( "Max Thread Count = " +threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
}
}
}
Like the title says, the testInnerParallelLoopWithManagedBlock
method breaks with the following exception:
java.util.concurrent.RejectedExecutionException: java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker
But the testInnerParallelLoopWithSharedPoolAndManagedBlock
method works fine up to a count of 10,000 (and probably more but I haven't tested beyond 10,000).
I initially thought the issue might be due to the interactions between nested parallel streams and ManagedBlocker
acting on the same pool. So I created another test method where I use the same shared custom FJPool for both inner and outer loops:
public static void testBothLoopsWithSharedPoolAndManagedBlock(int limit){
Map<Integer, Integer> threads = new ConcurrentHashMap<>();
boolean threwException = false;
for(int c = 0; c < limit; c++) {
try {
sharedInnerPool.submit(()->StateData.outerLoop.parallelStream().unordered().forEach(i -> {
sharedInnerPool.submit(() -> StateData.innerLoop.parallelStream().unordered().forEach(j -> {
try {
ForkJoinPool.managedBlock(new SleepManagedBlocker(Duration.ofMillis(5)));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
})).join();
int count = sharedInnerPool.getActiveThreadCount();
threads.put(count, count);
})).join();
} catch (Exception e) {
System.out.println(e.getMessage());
threwException = true;
int count = ForkJoinPool.commonPool().getActiveThreadCount();
threads.clear();
threads.put(count, count);
break;
}
}
if(threwException){
System.out.println("Exception thrown. Max Thread Count = "+ threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
} else {
System.out.println( "Max Thread Count = " +threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
}
}
But this one works just fine. So the issue appears to specifically be with ForkJoinPool.commonPool()
.
Does anyone have an idea as to what's happening behind the scenes ?
The common ForkJoinPool
1 has a relatively low maximum pool size:
The parameters used to construct the common pool may be controlled by setting the following system properties:
java.util.concurrent.ForkJoinPool.common.parallelism
- the parallelism level, a non-negative integerjava.util.concurrent.ForkJoinPool.common.threadFactory
- the class name of a ForkJoinPool.ForkJoinWorkerThreadFactory. The system class loader is used to load this class.java.util.concurrent.ForkJoinPool.common.exceptionHandler
- the class name of aThread.UncaughtExceptionHandler
. The system class loader is used to load this class.java.util.concurrent.ForkJoinPool.common.maximumSpares
- the maximum number of allowed extra threads to maintain target parallelism (default 256) [emphasis added].
The custom ForkJoinPool
you're using is created by calling the no-argument constructor. That constructor:
Creates a
ForkJoinPool
with parallelism equal toRuntime.availableProcessors()
, using defaults for all other parameters (seeForkJoinPool(int, ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean, int, int, int, Predicate, long, TimeUnit)
).
The constructor linked by that documentation says the following regarding the maximum pool size:
maximumPoolSize
- the maximum number of threads allowed. When the maximum is reached, attempts to replace blocked threads fail. (However, because creation and termination of different threads may overlap, and may be managed by the given thread factory, this value may be transiently exceeded.) To arrange the same value as is used by default for the common pool, use 256 plus the parallelism level. (By default, the common pool allows a maximum of 256 spare threads.) Using a value (for exampleInteger.MAX_VALUE
) larger than the implementation's total thread limit has the same effect as using this limit (which is the default) [emphasis added].
And if you look at the implementation note in the class Javadoc, you'll see:
Implementation Note:
This implementation restricts the maximum number of running threads to 32767 [emphasis added]. Attempts to create pools with greater than the maximum number result in
IllegalArgumentException
. Also, this implementation rejects submitted tasks (that is, by throwingRejectedExecutionException
) only when the pool is shut down or internal resources have been exhausted.
In short, your custom ForkJoinPool
has a much larger maximum pool size. You can likely prevent the RejectedExecutionException
with the common pool if you increase the value of the maximum spares by setting the java.util.concurrent.ForkJoinPool.common.maximumSpares
system property.
1. Links are for Java 23, but as far as I can tell the relevant documentation hasn't changed since Java 17.