javareactive-programmingspring-webfluxreactor

block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-


I've read that there is an option to make a blocking call using Mono. So I tried to write several code snippets:

A)

Mono.just("qwerty")
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.boundedElastic())
.block()

B)

Mono<String> customMono = Mono.just("qwerty");
Mono<String> blockedMono = customMono
        .subscribeOn(Schedulers.boundedElastic())
        .publishOn(Schedulers.boundedElastic());
System.out.println("blockedMono.block(): " + blockedMono.block());

C)

 Mono<String> customMono = Mono.just("qwerty");
 Mono<String> blockedMono =  Mono.just(0)
            .subscribeOn(Schedulers.boundedElastic())
            .publishOn(Schedulers.boundedElastic())
            .then(customMono);    
 System.out.println("blockedMono.block(): " + blockedMono.block());

leads to the same error:

block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-1

Is there way to fix it ?

P.S. I need a blocking call and I am aware that it is not good to use blocking operations in reative code

P.S.2

This works but I want to avoid converting to Future

Mono.just("qwerty").toFuture().get()

P.S.3

As @dan1st noticed the behaviour depends on execution context.

  1. All those code snippets works without any exceptions if we put them to the main method

  2. The behaviour described in the topic is experienced if we put the code inside

    @GetMapping(..)
    public void someEndpoint(...) {
       // snippet is here
    }
    

So this behaviour depends on spring web flux somehow

Could you please clarify why and how to fix it ?

UPDATE

Based on the answer I was able to write the code block below. As a result this line doesn't throw any exception but it returns null.

org.springframework.security.oauth2.jwt.Jwt jwt = it.block();

@GetMapping("/test")
public Mono<org.springframework.security.oauth2.jwt.Jwt> test() throws ExecutionException, InterruptedException {
    Mono<org.springframework.security.oauth2.jwt.Jwt> firstMono =
            ReactiveSecurityContextHolder.getContext()
                    .publishOn(Schedulers.boundedElastic()) //<-- this allows .block() somewhere downstream
                    .subscribeOn(Schedulers.boundedElastic())
                    .flatMap(securityContext -> Mono.just((org.springframework.security.oauth2.jwt.Jwt) securityContext.getAuthentication().getPrincipal()));
    Mono<org.springframework.security.oauth2.jwt.Jwt> secondMono = Mono.just(firstMono)
            .publishOn(Schedulers.boundedElastic())
            .map(it -> {
                org.springframework.security.oauth2.jwt.Jwt jwt = it.block();
                System.out.println(Thread.currentThread() + "-" + jwt);
                return jwt;
            });
    return secondMono;
}

So endpoint method fails with error:

java.lang.NullPointerException: The mapper [....my.Controller$$Lambda$2012/0x0000000800b68840] returned a null value

But if I write

@GetMapping("/test")
public Mono<org.springframework.security.oauth2.jwt.Jwt> test() throws ExecutionException, InterruptedException {
    Mono<org.springframework.security.oauth2.jwt.Jwt> firstMono =
            ReactiveSecurityContextHolder.getContext()
                    .map(securityContext ->(org.springframework.security.oauth2.jwt.Jwt) securityContext.getAuthentication().getPrincipal());        
    return firstMono;
}

everything is OK and response contains JWT. Looks like it is because of spring webflux context magic.


Solution

  • Disclaimer

    This is a rephrase and clarification of my answer from another post.

    Blocking in Reactor

    When you block in reactor, it means the thread calling block() is locked, waiting for the block operation to end.

    One important fact is that the blocked thread is the calling thread, not one of the thread of the publisher. It might look like an obvious statement, but I've been bitten by this error in the past, and many other users also (this is the subject of your question, for example).

    As block is not part of the reactive pipeline, when you do this:

    public static void main(String[] args) {
       Mono.just(1000)
           .publishOn(Schedulers.single())
           .flatMap(delay -> Mono.delay(Duration.ofMillis(delay)) // <1>
           .block(); // <2>
    }
    
    1. flatMap is executed by single scheduler
    2. block is executed by program main thread.

    And when you do this:

    public static void main(String[] args) {
        var publisher = Mono.just(1000)
            .subscribeOn(Schedulers.single())
            .flatMap(delay -> Mono.delay(Duration.ofMillis(delay));
       
        Mono.just("whatever")
            .publishOn(Schedulers.boundedElastic())
            .map(it -> publisher.block()) // <1>
            .block(); // <2>
    }
    
    1. The first block is executed on a thread of the bounded elastic scheduler.
    2. The second block is executed from program main thread.

    When blocking is forbidden

    To discourage its usage, Reactor enforce context verification. When block() is called, it checks what is the calling thread, and if it consider the calling thread comes from a blocking incompatible scheduler, it will raise an error, to prevent locking the thread.

    Most of Reactor and Webflux schedulers are incompatible with block(), because they are designed for high throughput using minimal resource.

    Therefore, when you return a publisher containing block operations in webflux, most of the time it will be executed in a scheduler that do not accept it, and you end up with the error you describe.

    How to block in the middle of a reactive pipeline

    First and foremost, try to avoid blocking. Verify if this is avoidable. Otherwise:

    1. Select a blocking compatible scheduler:
    2. Wrap the publisher you want to block with another one, that will publish / schedule its actions on your scheduler:
      Scheduler blockingCompatibleScheduler = Schedulers.boundedElastic();
      Mono<T> toBlock = Mono...
      Mono<T> wrapper = Mono.fromCallable(() -> toBlock.block())
                            .subscribeOn(blockingCompatibleScheduler);
      // or
      Mono<T> wrapper = Mono.just("any")
                            .publishOn(blockingCompatibleScheduler)
                            .map(it -> toBlock.block());
      
      More details available in dedicated section of the official doc

    Block breaks context

    Until now, I was refering to the execution context. But there is a second kind: the state context.

    Since Reactor 3.1, a Context / ContextView API is provided to share contextual information across chained subscriptions, from downstream to upstream.

    The official documentation contains an dedicated section about this mechanism, to provide in-depth explanation (this is quite complex).

    Due to its nature, block operator prevent this mechanism to work. As a context propagate information from a downstream subscription to an inner/upstream one, it cannot provide context information to block operator: it use a hidden subscription, disconnected from the parent/downstream subscription/pipeline. Said otherwise, a flux/mono cannot access an inner blocked publisher, and therefore cannot propagate context in it.

    We can test it in a simplified example:

    import reactor.core.publisher.Mono;
    
    public class BlockBreaksContext {
    
        static final Object CTX_KEY = new Object();
    
        /**
         * Add "Hello" message to the provided action context, then run/block it and print
         * output value.
         */
        static void execute(Mono<String> action) {
            String value = Mono.from(action)
                               .contextWrite(ctx -> ctx.put(CTX_KEY, "Hello"))
                               .block();
            System.out.println(value);
        }
    
        public static void main(String[] args) {
            Mono<String> getContextValue = Mono.deferContextual(ctx
                    -> Mono.just(ctx.getOrDefault(CTX_KEY, "No value from context")));
    
            // Without blocking, the mono will receive the context written by downstream subscription
            execute(getContextValue.map(it -> "NO BLOCKING: " + it));
    
            // With blocking, the Mono is **not** part of the "main" pipeline/subscription.
            // Therefore, it cannot receive context on execution, because block cause an
            // independent/isolated subscription, whose main chain of event is unaware of.
            Mono wrapBlock = Mono.fromCallable(() -> getContextValue.block());
            execute(wrapBlock.map(it -> "BLOCKING: " + it));
        }
    }
    

    This program prints:

    NO BLOCKING: Hello
    BLOCKING: No value from context
    

    In your question, you try to access a security token. As the token is resolved upon client request, Webflux put it in the response publisher context. When you block, you disassociate your blocked publisher from the response publisher.