javaforkjoinpooljava-17openjdk-17

Why does Java 17 throw a RejectedExecutionException when adding tasks to a ForkJoinPool?


I used Java 16 to make requests to an API over HTTP. To speed this up overall, I've loaded this onto a custom ForkJoinPool. I've compiled a reproducing example below.

Since moving to Java 17 (openjdk build 17.0.1+12-39), this throws a RejectedExecutionException:

Caused by: java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker
    at java.base/java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:1819)
    at java.base/java.util.concurrent.ForkJoinPool.compensatedBlock(ForkJoinPool.java:3446)
    at java.base/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3432)
    at java.base/java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1898)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2072)
    at java.net.http/jdk.internal.net.http.HttpClientImpl.send(HttpClientImpl.java:553)
    at java.net.http/jdk.internal.net.http.HttpClientFacade.send(HttpClientFacade.java:119)
    at Test.lambda$retrieveMany$1(Test.java:30)

Why does this happen? Did something change regarding ForkJoinPool that I'm unaware of?

Code

import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;

import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.stream.Collectors.toList;

public class Test {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        final List<String> urls = List.of("https://stackoverflow.com", "https://stackoverflow.com", "https://stackoverflow.com");

        // This succeeds on JDK 16, 17
        retrieveMany(urls, 4);

        // This fails on JDK 17, but succeeds on 16
        retrieveMany(urls, 3);
    }

    private static List<String> retrieveMany(List<String> urls, int threads) throws InterruptedException, ExecutionException {
        return new ForkJoinPool(threads, ForkJoinPool.defaultForkJoinWorkerThreadFactory, (t, e) -> {}, true, 0, threads, 1, null, 1, MINUTES)
                .submit(() -> urls.parallelStream()
                        .map(url -> {
                            try {
                                return HttpClient.newBuilder().build().send(HttpRequest.newBuilder(URI.create(url)).build(), BodyHandlers.ofString()).body();
                            } catch (IOException | InterruptedException aE) { }
                            return null;
                        })
                        .collect(toList()))
                .get();
    }

}

Solution

  • You have submitted one task, but that uses parallelStream() internally which then runs each http on different threads of the same fork join pool.

    There is a difference in the way JDK16 and 17 deal with the situation that the all the available threads in the pool are in use - this is where the saturated parameter becomes relevent.

    When threads > urls.size() the pool is never saturated, but in your second case threads == urls.size() so all the threads are in use. Replace null in the constructor of the ForkJoinPool by a saturate variable to see when saturate test condition is triggered:

    Predicate<? super ForkJoinPool> saturate = pool -> {
        boolean allow = false;
        System.out.println(Thread.currentThread().getName()+" saturate:"+allow);
        return allow;
    };
    

    On JDK16 saturate predicate gets called several times but continues, whereas on JDK17 the processing stops on first call if it returns false. If you switch allow = true then JDK17 won't send RejectedExecutionException when the number of in-progress requests is the same as the threads in use for the parallelStream(), and will continue processing further requests when other requests are completed.