I am new to ProjectReactor(using reactor-core:3.4.18), I am trying to parallelize the flux consumer subscription , I am creating a Scheduler with max threads as 2, but its failing with the following exception , whereas when i give threadscount as 4 , its working fine .
Scheduler schedulers = Schedulers.newBoundedElastic(2, 2, "PublishedThread");
Flux.range(1, 10)
.parallel()
.runOn(schedulers)
.doOnNext(e -> printName(e))
.subscribe();
[ERROR] (main) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$ReactorRejectedExecutionException: Task capacity of bounded elastic scheduler reached while scheduling 1 tasks (3/2) reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$ReactorRejectedExecutionException: Task capacity of bounded elastic scheduler reached while scheduling 1 tasks (3/2) Caused by: reactor.core.Exceptions$ReactorRejectedExecutionException: Task capacity of bounded elastic scheduler reached while scheduling 1 tasks (3/2) at reactor.core.Exceptions.failWithRejected(Exceptions.java:277)
Can someone help me to understand why giving less number of threads is throwing this exception ?
This behaviour is documented in the scheduler creation method you use. From Schedulers.newBoundedElastic(int, int, String):
The maximum number of task submissions that can be enqueued and deferred on each of these backing threads is bounded by the provided queuedTaskCap. Past that point, a RejectedExecutionException is thrown.
So, with the scheduler you created, you cannot dispatch more than 4 tasks at the same time. Now, as you have not specified the parallelism/number of rails to use when calling Flux.parallel, I would say that the Flux tries to create more than 4 parallel tasks.
Workarounds:
Flux.range(1, 10).parallel(3)
Scheduler scheduler = Schedulers.newParallel("PublishedThread", 2);
Flux.range(0, 10).parallel().runOn(scheduler);
Here is a full example creating a ParallelFlux running 8 rails on a new parallel scheduler of 3 threads, and properly waiting for it to finish:
import java.util.concurrent.CountDownLatch;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class TestParallelLimitations {
public static void main(String[] args) throws Exception {
Scheduler scheduler = Schedulers.newParallel("PublishedThread", 3);
try (AutoCloseable disposeScheduler = scheduler::dispose) {
var flow = Flux.range(1, 12)
.parallel(8)
.runOn(scheduler)
.map(i -> "["+Thread.currentThread().getName()+"] -> "+i);
var barrier = new CountDownLatch(flow.parallelism());
flow.subscribe(System.out::println,
err -> {
System.err.println("ERROR: " + err.getMessage());
long remaining = barrier.getCount();
while (remaining > 0) {
barrier.countDown();
remaining = barrier.getCount();
}
},
barrier::countDown);
barrier.await();
}
}
}
This example program outptut is:
[PublishedThread-3] -> 6
[PublishedThread-2] -> 5
[PublishedThread-2] -> 8
[PublishedThread-3] -> 3
[PublishedThread-3] -> 11
[PublishedThread-2] -> 2
[PublishedThread-2] -> 10
[PublishedThread-1] -> 4
[PublishedThread-1] -> 12
[PublishedThread-1] -> 7
[PublishedThread-1] -> 1
[PublishedThread-1] -> 9
We can see that all 12 elements have been processed/:dispatched on the 3 threads from the created scheduler.