multithreadingspring-webfluxreactive-programmingproject-reactor

What is the threading model of Spring Reactor, and .subscribe() seems to not utilise more than a single thread?


How come no default threadPool is spawned/used to more efficiently process each integer?
I'm testing with testing with reactive chains. Based off subscribing to the publisher:

Subscribe is an asynchronous process. It means that when you call subscribe, it launch processing in the background, then return immediately.

Flux.range(0, 10000)
    .doOnNext(i -> {
        System.out.println("start " + i + " Thread: " + Thread.currentThread().getName());
        try {
            Thread.sleep(2000);
        } catch(InterruptedException e) {
            throw new RuntimeException(e);
        }
    })
    .flatMap(i -> {
        System.out.println("end" + i + " Thread: " + Thread.currentThread().getName());
        return Mono.just(i);
    })
    .doOnError(err -> {
        throw new RuntimeException(err.getMessage());
    })
    .subscribe();
System.out.println("Thread: " + Thread.currentThread().getName() + " Hello world");

My main confusion is the output:

start 0 Thread: main
end0 Thread: main
start 1 Thread: main
end1 Thread: main
start 2 Thread: main
end2 Thread: main
start 3 Thread: main
end3 Thread: main
start 4 Thread: main
end4 Thread: main
Thread: main Hello world

However, my observation seems to differ where I observed a blocking behavior here since the printing of "Hello World". This must first wait for the processing of the Flux Reactive chain to first finish as that chain is using and blocking(?) the main thread.

Uncommenting subscribeOn() has a different, 'correct' sort of behavior:

Flux.range(0, 10000)
    .doOnNext(i -> {
        System.out.println("start " + i + " Thread: " + Thread.currentThread().getName());
        try {
            Thread.sleep(2000);
        } catch(InterruptedException e) {
            throw new RuntimeException(e);
        }
    })
    .flatMap(i -> {
        System.out.println("end" + i + " Thread: " + Thread.currentThread().getName());
        return Mono.just(i);
    })
    .doOnError(err -> {
        throw new RuntimeException(err.getMessage());
    })
    .subscribeOn(Schedulers.boundedElastic())
    .subscribe();
System.out.println("Thread: " + Thread.currentThread().getName() + " Hello world");
Thread: main Hello world
start 0 Thread: boundedElastic-1
end0 Thread: boundedElastic-1
start 1 Thread: boundedElastic-1
end1 Thread: boundedElastic-1
start 2 Thread: boundedElastic-1

My understanding of this is because we now specify a threadPool the reactive chain must use for its processing. The main thread is free to behave unblocked and asynchronously printing "Hello World" first. Likewise, if we were to now replace .subscribe() with .blockLast():

Flux.range(0, 5)
    .doOnNext(i -> {
        System.out.println("start " + i + " Thread: " + Thread.currentThread().getName());
        try {
            Thread.sleep(2000);
        } catch(InterruptedException e) {
            throw new RuntimeException(e);
        }
    })
    .flatMap(i -> {
        //                        if (i == 100) return Mono.error(new RuntimeException("Died cuz i = 100"));
        System.out.println("end" + i + " Thread: " + Thread.currentThread().getName());
        return Mono.just(i);
    })
    .doOnError(err -> {
        throw new RuntimeException(err.getMessage());
    })
    .subscribeOn(Schedulers.boundedElastic())
    .blockLast();
System.out.println("Thread: " + Thread.currentThread().getName() + " Hello world");

This is changed to blocking instead of asynchronous. Even if we specified a different thread pool to be used for the reactive chain's processing is still blocked until the reactive chain returns either a success or error signal before releasing the main thread. The main thread being the caller thread.

Is my understanding so far sound? And, where am I going wrong in my understanding of Project Reactor's default threading behavior?


Solution

  • I advise you to read the section related to threading in the official documentation. It should provide you with a good understanding of what happens. I will try to sum it up as well as I can.

    Why Flux is processed on the calling thread ?

    Flux and Mono objects model a suspendable, resumable chain of operations. It means the engine can "park" actions, and later schedule their execution on an available thread.

    Now, when you call subscribe on a Publisher, you start the chain of operation. Its first actions are launched on the calling thread, as cited from the doc:

    Unless specified, the topmost operator (the source) itself runs on the Thread in which the subscribe() call was made.

    In case your flow is simple enough, there is a good chance that the entire flux/mono will be processed on the same thread.

    Does this mean it is called synchronously / on program foreground ?

    That might give the illusion of synchronous processing, but it is not.

    We can already see it in your first example. You create a range of a thousand values, but only 4 values are printed before Thread: main Hello world message. It shows that the processing has started, but has been "paused" to let your main program continue.

    We can also see this more clearly in the following example:

    // Assemble the flow of operations
    Flux flow = Flux.range(0, 5)
            .flatMap(i -> Mono
                    .just("flatMap on [" + Thread.currentThread().getName() + "] -> " + i)
                    .delayElement(Duration.ofMillis(50)));
    
    // Launch processing
    Disposable execStatus = flow.subscribe(System.out::println);
    
    System.out.println("SUBSCRIPTION DONE");
    
    // Prevent program to stop before the flow is over.
    do {
        System.out.println("Waiting for the flow to end...");
        Thread.sleep(50);
    } while (!execStatus.isDisposed());
    
    System.out.println("FLUX PROCESSED");
    

    This program prints:

    SUBSCRIPTION DONE
    Waiting for the flow to end...
    flatMap on [main] -> 0
    flatMap on [main] -> 1
    flatMap on [main] -> 3
    Waiting for the flow to end...
    flatMap on [main] -> 4
    flatMap on [main] -> 2
    FLUX PROCESSED
    

    If we look at it, we can see that messages from the main program are interleaved with the main program, so even if the flow is executed on the main thread, it is done in the background nonetheless.

    This proves what is stated by subscribe(Consumer) apidoc:

    Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread.

    Why no other threads is involved ?

    Now, why no other thread has been used ? Well, this is a complex question. In this case, I would say that the engine decided that no other thread was necessary to perform the pipeline with good performance. Switching between threads has a cost, so if it can be avoided, I think Reactor avoids it.

    Can other threads be involved ?

    The documentation states:

    Some operators use a specific scheduler from Schedulers by default

    It means that depending on the pipeline, its tasks might be dispatched on threads provided by a Scheduler.

    In fact, flatMap should do that. If we only slightly modify the example, we will see operations being dispatched to the parallel scheduler. All we have to do is limit the concurrency (yes, I know, this is not very intuitive). By default, flatMap uses a concurrency factor of 256. It means that it can start 256 operations in the same time (roughly explained). Let's constrain it to 2:

    Flux flow = Flux.range(0, 5)
            .flatMap(i -> Mono
                    .just("flatMap on [" + Thread.currentThread().getName() + "] -> " + i)
                    .delayElement(Duration.ofMillis(50)),
                    2);
    

    Now, the program prints:

    SUBSCRIPTION DONE
    Waiting for the flow to end...
    flatMap on [main] -> 0
    Waiting for the flow to end...
    flatMap on [main] -> 1
    Waiting for the flow to end...
    flatMap on [parallel-1] -> 2
    flatMap on [parallel-2] -> 3
    Waiting for the flow to end...
    flatMap on [parallel-3] -> 4
    FLUX PROCESSED
    

    We see that operations 2, 3, and 4 have happened on a thread named parallel-x. These are threads spawned by Schedulers.parallel.

    NOTE: subscribeOn and publishOn methods can be used to gain more fine-grained control over threading.

    About blocking

    Now, does block, blockFirst and blockLast methods change how operations are scheduled/executed ? The response is no.

    When you use block, internally, the flow is subscribed as with subscribe() call. However, once the flow is triggered, instead of returning, internally Reactor uses the calling thread to loop and wait until the flux is done, as I've done in my first example above (but they do it in a much clever way).

    We can try it. Using the constrained flatMap, what is printed if we use block instead of manually looping ?

    The program:

    Flux.range(0, 5)
            .flatMap(i -> Mono
                    .just("flatMap on [" + Thread.currentThread().getName() + "] -> " + i)
                    .delayElement(Duration.ofMillis(50)),
                    2)
            .doOnNext(System.out::println)
            .blockLast();
    
    System.out.println("FLUX PROCESSED");
    

    prints:

    flatMap on [main] -> 0
    flatMap on [main] -> 1
    flatMap on [parallel-1] -> 2
    flatMap on [parallel-2] -> 3
    flatMap on [parallel-3] -> 4
    FLUX PROCESSED
    

    We see that as previously, the flux used both main thread and parallel threads to process its elements. But this time, the main program was "halted" until the flow has finished. Block prevented our program to resume until Flux completion.