javaperformancejava-streambenchmarking

Why are inner parallel streams faster with new pools than with the commonPool for this scenario?


So I recently ran a benchmark where I compared the performance of nested streams in 3 cases:

Here's the benchmark code (I've used JMH):

public class NestedPerf {
  @State(Scope.Benchmark)
  public static class StateData{
    public static final List<Integer> outerLoop = IntStream.range(0, 32).boxed().toList();
    public static final List<Integer> innerLoop = IntStream.range(0, 32).boxed().toList();
  }
  private static void runInNewPool(Runnable task) {
    ForkJoinPool pool = new ForkJoinPool();
    try {
      pool.submit(task).join();
    } finally {
      pool.shutdown();
    }
  }
  private static void innerParallelLoop() {
    StateData.innerLoop.parallelStream().unordered().forEach(i -> {
      try {
        Thread.sleep(5);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    });
  }
  private static void innerSequentialLoop() {
    StateData.innerLoop.stream().unordered().forEach(i -> {
      try {
        Thread.sleep(5);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    });
  }
  @Benchmark
  public void testingNewPool(Blackhole bh){
    StateData.outerLoop.parallelStream().unordered().forEach(i -> {
      runInNewPool(ParallelPerf::innerParallelLoop);
      bh.consume(i);
    });
  }

  @Benchmark
  public void testingCommonPoolWithSequentialInner(Blackhole bh){
    StateData.outerLoop.parallelStream().unordered().forEach(i -> {
      innerSequentialLoop();
      bh.consume(i);
    });
  }
  @Benchmark
  public void testingCommonPool(Blackhole bh){
    StateData.outerLoop.parallelStream().unordered().forEach(i -> {
      innerParallelLoop();
      bh.consume(i);
    });
  }
}

And here is the output:

Benchmark                                         Mode  Cnt   Score   Error  Units
NestedPerf.testingCommonPool                     thrpt   25   1.935 ± 0.005  ops/s
NestedPerf.testingCommonPoolWithSequentialInner  thrpt   25   1.744 ± 0.007  ops/s
NestedPerf.testingNewPool                        thrpt   25  22.648 ± 0.559  ops/s

The difference between the method with new Pools vs the method with commonPool is surprising. Does anyone have an idea as to why creating new pools makes things around 20x faster for this benchmark ?

If it helps, I'm running this on a Core i7 10850H system with 12 available CPUs (hexcore + hyperthreading).


Solution

  • Why Throughput Increases

    Your tasks are simply a call to Thread::sleep. That blocks the calling thread, which means the OS will not schedule the thread for execution until the specified duration elapses. This leaves the CPU free to execute any other threads. In other words, your tasks are not CPU-bound and thus do not burden the CPU. Which means throwing more threads at your set of tasks is going to increase throughput without overwhelming the CPU.

    By using multiple fork-join pools, you are effectively increasing the number of threads available to execute your tasks. It's not much different from simply increasing the number of threads in a single pool. Whether you have 1 pool with 15 threads or 3 pools with 5 threads each, you still end up with a total of 15 threads.

    Let's say you have 10 tasks that each sleep for 5 milliseconds. If you have 5 threads to execute those tasks, then you'll roughly see:

    Start 5 tasks => Wait 5 ms => Start 5 tasks => Wait 5 ms => Done!
    

    But if you have 10 threads you'll roughly see:

    Start 10 tasks => Wait 5 ms => Done!
    

    The first takes a total of 10 milliseconds to execute every task, the second only takes 5 milliseconds. And that's basically where the increased throughput is coming from in your tests.


    Maintaining Parallelism

    All that said, a ForkJoinPool has a set level of parallelism. One way it tries to maintain this parallelism is by spawning a new thread (if the maximum number of threads hasn't already been reached) when one of its threads is blocked. From the documentation:

    [A ForkJoinPool] attempts to maintain enough active (or available) threads by dynamically adding, suspending, or resuming internal worker threads, even if some tasks are stalled waiting to join others. However, no such adjustments are guaranteed in the face of blocked I/O or other unmanaged synchronization. The nested ForkJoinPool.ManagedBlocker interface enables extension of the kinds of synchronization accommodated.

    You're calling Thread::sleep in an unmanaged way. In other words, you're blocking the threads of the pool in such a way that the pool cannot compensate. To prevent that, consider using a ManagedBlocker. Here's an example implementation:

    import java.time.Duration;
    import java.util.concurrent.ForkJoinPool;
    
    public class SleepManagedBlocker implements ForkJoinPool.ManagedBlocker {
    
      private final Duration sleepDuration;
      private boolean slept; // Does this need to be volatile?
    
      public SleepManagedBlocker(Duration slepDuration) {
        this.sleepDuration = slepDuration;
      }
    
      @Override
      public boolean block() throws InterruptedException {
        if (!slept) {
          slept = true;
          Thread.sleep(sleepDuration);
        }
        return slept;
      }
    
      @Override
      public boolean isReleasable() {
        return slept;
      }
    }
    

    Then you would replace the Thread.sleep(5) calls with:

    ForkJoinPool.managedBlock(new SleepManagedBlocker(Duration.ofMillis(5)))
    

    You should see similar throughput increases in your tests without needing to using multiple fork-join pools.


    JMH Benchmarks

    Here is a benchmark showing the effect of using ManagedBlocker in this case. It was compiled and executed on Java 23.

    import java.time.Duration;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.IntStream;
    import org.openjdk.jmh.annotations.Benchmark;
    import org.openjdk.jmh.annotations.BenchmarkMode;
    import org.openjdk.jmh.annotations.Fork;
    import org.openjdk.jmh.annotations.Measurement;
    import org.openjdk.jmh.annotations.Mode;
    import org.openjdk.jmh.annotations.OutputTimeUnit;
    import org.openjdk.jmh.annotations.Param;
    import org.openjdk.jmh.annotations.Scope;
    import org.openjdk.jmh.annotations.State;
    import org.openjdk.jmh.annotations.Warmup;
    import org.openjdk.jmh.infra.Blackhole;
    
    @Fork(value = 1, jvmArgsAppend = {"-Djava.util.concurrent.ForkJoinPool.common.maximumSpares=1024"})
    @Warmup(iterations = 5)
    @Measurement(iterations = 5)
    @BenchmarkMode(Mode.Throughput)
    @OutputTimeUnit(TimeUnit.SECONDS)
    public class FJPBenchmarks {
    
      @Benchmark
      public void runTest(TestState state, Blackhole bh) {
        state.executeOuterLoop(bh);
      }
    
      @State(Scope.Benchmark)
      public static class TestState {
    
        private static final Duration SLEEP_DURATION = Duration.ofMillis(5);
        private static final int OUTER_LOOP_COUNT = 32;
        private static final int INNER_LOOP_COUNT = 32;
    
        @Param({"sequential", "parallel"})
        private String sequentialMode;
    
        @Param({"common", "separate"})
        private String poolMode;
    
        @Param({"raw", "managed"})
        private String sleepMode;
    
        void executeOuterLoop(Blackhole bh) {
          IntStream.range(0, OUTER_LOOP_COUNT)
              .unordered()
              .parallel()
              .forEach(i -> {
                executeInnerLoop(createInnerLoop());
                bh.consume(i);
              });
        }
    
        IntStream createInnerLoop() {
          var stream = IntStream.range(0, INNER_LOOP_COUNT).unordered();
          return switch (sequentialMode) {
            case "sequential" -> stream.sequential();
            case "parallel" -> stream.parallel();
            default -> throw new IllegalStateException("bad sequentialMode: " + sequentialMode);
          };
        }
    
        void executeInnerLoop(IntStream loop) {
          var sleeper = getSleeper();
          switch (poolMode) {
            case "common" -> loop.forEach(_ -> sleeper.sleepUnchecked());
            case "separate" -> {
              try (var pool = new ForkJoinPool()) {
                loop.forEach(_ -> pool.submit(sleeper::sleepUnchecked).join());
              }
            }
            default -> throw new IllegalStateException("bad poolMode: " + poolMode);
          }
        }
    
        Sleeper getSleeper() {
          return switch (sleepMode) {
            case "raw" -> () -> Thread.sleep(SLEEP_DURATION);
            case "managed" -> () -> ForkJoinPool.managedBlock(new SleepManagedBlocker());
            default -> throw new IllegalStateException("bad sleepMode: " + sleepMode);
          };
        }
    
        @FunctionalInterface
        interface Sleeper {
      
          void sleep() throws InterruptedException;
    
          default Void sleepUnchecked() {
            try {
              sleep();
            } catch (InterruptedException ex) {
              throw new RuntimeException(ex);
            }
            return null;
          }
        }
    
        static class SleepManagedBlocker implements ForkJoinPool.ManagedBlocker {
    
          private boolean slept;
    
          @Override
          public boolean block() throws InterruptedException {
            if (!slept) {
              slept = true;
              Thread.sleep(SLEEP_DURATION);
            }
            return true;
          }
    
          @Override
          public boolean isReleasable() {
            return slept;
          }
        }
      }
    }
    

    Results (from executing the benchmark on a computer with 8 processors):

    Benchmark              (poolMode)  (sequentialMode)  (sleepMode)   Mode  Cnt   Score   Error  Units
    FJPBenchmarks.runTest      common        sequential          raw  thrpt    5   1.463 � 0.022  ops/s
    FJPBenchmarks.runTest      common        sequential      managed  thrpt    5   5.858 � 0.026  ops/s
    FJPBenchmarks.runTest      common          parallel          raw  thrpt    5   1.454 � 0.044  ops/s
    FJPBenchmarks.runTest      common          parallel      managed  thrpt    5  35.997 � 0.234  ops/s
    FJPBenchmarks.runTest    separate        sequential          raw  thrpt    5   1.426 � 0.325  ops/s
    FJPBenchmarks.runTest    separate        sequential      managed  thrpt    5   1.348 � 0.157  ops/s
    FJPBenchmarks.runTest    separate          parallel          raw  thrpt    5  13.505 � 1.175  ops/s
    FJPBenchmarks.runTest    separate          parallel      managed  thrpt    5  16.864 � 0.186  ops/s