javajava-streamforkjoinpool

RejectedExecutionException when using ManagedBlocker in nested parallelStreams


This is a follow-up from this post: Why are inner parallel streams faster with new pools than with the commonPool for this scenario?

Apologies in advance for the wall of text. This is for JDK 17.

I am currently testing methods with these scenarios:

Here's how the code looks:

public class NestedPerf {
  private static final ForkJoinPool sharedInnerPool = new ForkJoinPool();

  public static void main(String[] args){
//    testInnerParallelLoopWithSharedPoolAndManagedBlock(10000);
//    testInnerParallelLoopWithManagedBlock(2);
  }

  public static void testInnerParallelLoopWithSharedPoolAndManagedBlock(int limit){
    Map<Integer, Integer> threads = new ConcurrentHashMap<>();
    boolean threwException = false;
    for(int c = 0; c < limit; c++) {
      try {
        StateData.outerLoop.parallelStream().unordered().forEach(i -> {
          sharedInnerPool.submit(() -> StateData.innerLoop.parallelStream().unordered().forEach(j -> {
            try {
              ForkJoinPool.managedBlock(new SleepManagedBlocker(Duration.ofMillis(5)));
            } catch (InterruptedException e) {
              Thread.currentThread().interrupt();
            }
          })).join();
          int count = sharedInnerPool.getActiveThreadCount();
          threads.put(count, count);
        });
      } catch (Exception e) {
        System.out.println(e.getMessage());
        threwException = true;
        int count = ForkJoinPool.commonPool().getActiveThreadCount();
        threads.clear();
        threads.put(count, count);
        break;
      }
    }
    if(threwException){
      System.out.println("Exception thrown. Max Thread Count = "+  threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
    } else {
      System.out.println( "Max Thread Count = " +threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
    }
  }

  public static void testInnerParallelLoopWithManagedBlock(int limit){
    Map<Integer, Integer> threads = new ConcurrentHashMap<>();
    boolean threwException = false;
    for(int c = 0; c < limit; c++) {
      try {
        StateData.outerLoop.parallelStream().unordered().forEach(i -> {
          innerParallelLoopWithManagedBlock();
          int count = ForkJoinPool.commonPool().getActiveThreadCount();
          threads.put(count, count);
        });
      } catch (Exception e) {
        System.out.println(e.getMessage());
        threwException = true;
        int count = ForkJoinPool.commonPool().getActiveThreadCount();
        threads.clear();
        threads.put(count, count);
        break;
      }
    }
    if(threwException){
      System.out.println("Exception thrown. Max Thread Count = "+  threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
    } else {
      System.out.println( "Max Thread Count = " +threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
    }
  }
}

Like the title says, the testInnerParallelLoopWithManagedBlock method breaks with the following exception:

java.util.concurrent.RejectedExecutionException: java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker

But the testInnerParallelLoopWithSharedPoolAndManagedBlock method works fine up to a count of 10,000 (and probably more but I haven't tested beyond 10,000).

I initially thought the issue might be due to the interactions between nested parallel streams and ManagedBlocker acting on the same pool. So I created another test method where I use the same shared custom FJPool for both inner and outer loops:

public static void testBothLoopsWithSharedPoolAndManagedBlock(int limit){
    Map<Integer, Integer> threads = new ConcurrentHashMap<>();
    boolean threwException = false;
    for(int c = 0; c < limit; c++) {
      try {
        sharedInnerPool.submit(()->StateData.outerLoop.parallelStream().unordered().forEach(i -> {
          sharedInnerPool.submit(() -> StateData.innerLoop.parallelStream().unordered().forEach(j -> {
            try {
              ForkJoinPool.managedBlock(new SleepManagedBlocker(Duration.ofMillis(5)));
            } catch (InterruptedException e) {
              Thread.currentThread().interrupt();
            }
          })).join();
          int count = sharedInnerPool.getActiveThreadCount();
          threads.put(count, count);
        })).join();
      } catch (Exception e) {
        System.out.println(e.getMessage());
        threwException = true;
        int count = ForkJoinPool.commonPool().getActiveThreadCount();
        threads.clear();
        threads.put(count, count);
        break;
      }
    }
    if(threwException){
      System.out.println("Exception thrown. Max Thread Count = "+  threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
    } else {
      System.out.println( "Max Thread Count = " +threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
    }
  }

But this one works just fine. So the issue appears to specifically be with ForkJoinPool.commonPool().

Does anyone have an idea as to what's happening behind the scenes ?


Solution

  • The common ForkJoinPool1 has a relatively low maximum pool size:

    The parameters used to construct the common pool may be controlled by setting the following system properties:

    • java.util.concurrent.ForkJoinPool.common.parallelism - the parallelism level, a non-negative integer
    • java.util.concurrent.ForkJoinPool.common.threadFactory - the class name of a ForkJoinPool.ForkJoinWorkerThreadFactory. The system class loader is used to load this class.
    • java.util.concurrent.ForkJoinPool.common.exceptionHandler - the class name of a Thread.UncaughtExceptionHandler. The system class loader is used to load this class.
    • java.util.concurrent.ForkJoinPool.common.maximumSpares - the maximum number of allowed extra threads to maintain target parallelism (default 256) [emphasis added].

    The custom ForkJoinPool you're using is created by calling the no-argument constructor. That constructor:

    Creates a ForkJoinPool with parallelism equal to Runtime.availableProcessors(), using defaults for all other parameters (see ForkJoinPool(int, ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean, int, int, int, Predicate, long, TimeUnit)).

    The constructor linked by that documentation says the following regarding the maximum pool size:

    maximumPoolSize - the maximum number of threads allowed. When the maximum is reached, attempts to replace blocked threads fail. (However, because creation and termination of different threads may overlap, and may be managed by the given thread factory, this value may be transiently exceeded.) To arrange the same value as is used by default for the common pool, use 256 plus the parallelism level. (By default, the common pool allows a maximum of 256 spare threads.) Using a value (for example Integer.MAX_VALUE) larger than the implementation's total thread limit has the same effect as using this limit (which is the default) [emphasis added].

    And if you look at the implementation note in the class Javadoc, you'll see:

    Implementation Note:

    This implementation restricts the maximum number of running threads to 32767 [emphasis added]. Attempts to create pools with greater than the maximum number result in IllegalArgumentException. Also, this implementation rejects submitted tasks (that is, by throwing RejectedExecutionException) only when the pool is shut down or internal resources have been exhausted.

    In short, your custom ForkJoinPool has a much larger maximum pool size. You can likely prevent the RejectedExecutionException with the common pool if you increase the value of the maximum spares by setting the java.util.concurrent.ForkJoinPool.common.maximumSpares system property.


    1. Links are for Java 23, but as far as I can tell the relevant documentation hasn't changed since Java 17.