javaspring-webfluxproject-reactorreactor

Reactor API returning Task capacity of bounded elastic scheduler Exception


I am new to ProjectReactor(using reactor-core:3.4.18), I am trying to parallelize the flux consumer subscription , I am creating a Scheduler with max threads as 2, but its failing with the following exception , whereas when i give threadscount as 4 , its working fine .

Scheduler schedulers = Schedulers.newBoundedElastic(2, 2, "PublishedThread");

        Flux.range(1, 10)
                .parallel()
                .runOn(schedulers)
                .doOnNext(e -> printName(e))
                .subscribe();

[ERROR] (main) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$ReactorRejectedExecutionException: Task capacity of bounded elastic scheduler reached while scheduling 1 tasks (3/2) reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$ReactorRejectedExecutionException: Task capacity of bounded elastic scheduler reached while scheduling 1 tasks (3/2) Caused by: reactor.core.Exceptions$ReactorRejectedExecutionException: Task capacity of bounded elastic scheduler reached while scheduling 1 tasks (3/2) at reactor.core.Exceptions.failWithRejected(Exceptions.java:277)

Can someone help me to understand why giving less number of threads is throwing this exception ?


Solution

  • This behaviour is documented in the scheduler creation method you use. From Schedulers.newBoundedElastic(int, int, String):

    The maximum number of task submissions that can be enqueued and deferred on each of these backing threads is bounded by the provided queuedTaskCap. Past that point, a RejectedExecutionException is thrown.

    So, with the scheduler you created, you cannot dispatch more than 4 tasks at the same time. Now, as you have not specified the parallelism/number of rails to use when calling Flux.parallel, I would say that the Flux tries to create more than 4 parallel tasks.

    Workarounds:

    Here is a full example creating a ParallelFlux running 8 rails on a new parallel scheduler of 3 threads, and properly waiting for it to finish:

    import java.util.concurrent.CountDownLatch;
    import reactor.core.publisher.Flux;
    import reactor.core.scheduler.Scheduler;
    import reactor.core.scheduler.Schedulers;
    
    public class TestParallelLimitations {
    
        public static void main(String[] args) throws Exception {
            Scheduler scheduler = Schedulers.newParallel("PublishedThread", 3);
            try (AutoCloseable disposeScheduler = scheduler::dispose) {
                var flow = Flux.range(1, 12)
                        .parallel(8)
                        .runOn(scheduler)
                        .map(i -> "["+Thread.currentThread().getName()+"] -> "+i);
    
                var barrier = new CountDownLatch(flow.parallelism());
    
                flow.subscribe(System.out::println,
                        err -> {
                            System.err.println("ERROR: " + err.getMessage());
                            long remaining = barrier.getCount();
                            while (remaining > 0) {
                                barrier.countDown();
                                remaining = barrier.getCount();
                            }
                        },
                        barrier::countDown);
    
                barrier.await();
            }
        }
    }
    

    This example program outptut is:

    [PublishedThread-3] -> 6
    [PublishedThread-2] -> 5
    [PublishedThread-2] -> 8
    [PublishedThread-3] -> 3
    [PublishedThread-3] -> 11
    [PublishedThread-2] -> 2
    [PublishedThread-2] -> 10
    [PublishedThread-1] -> 4
    [PublishedThread-1] -> 12
    [PublishedThread-1] -> 7
    [PublishedThread-1] -> 1
    [PublishedThread-1] -> 9
    

    We can see that all 12 elements have been processed/:dispatched on the 3 threads from the created scheduler.