I've read that there is an option to make a blocking call using Mono. So I tried to write several code snippets:
A)
Mono.just("qwerty")
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.boundedElastic())
.block()
B)
Mono<String> customMono = Mono.just("qwerty");
Mono<String> blockedMono = customMono
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.boundedElastic());
System.out.println("blockedMono.block(): " + blockedMono.block());
C)
Mono<String> customMono = Mono.just("qwerty");
Mono<String> blockedMono = Mono.just(0)
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.boundedElastic())
.then(customMono);
System.out.println("blockedMono.block(): " + blockedMono.block());
leads to the same error:
block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-1
Is there way to fix it ?
P.S. I need a blocking call and I am aware that it is not good to use blocking operations in reative code
P.S.2
This works but I want to avoid converting to Future
Mono.just("qwerty").toFuture().get()
P.S.3
As @dan1st noticed the behaviour depends on execution context.
All those code snippets works without any exceptions if we put them to the main method
The behaviour described in the topic is experienced if we put the code inside
@GetMapping(..)
public void someEndpoint(...) {
// snippet is here
}
So this behaviour depends on spring web flux somehow
Could you please clarify why and how to fix it ?
Based on the answer I was able to write the code block below. As a result this line doesn't throw any exception but it returns null
.
org.springframework.security.oauth2.jwt.Jwt jwt = it.block();
@GetMapping("/test")
public Mono<org.springframework.security.oauth2.jwt.Jwt> test() throws ExecutionException, InterruptedException {
Mono<org.springframework.security.oauth2.jwt.Jwt> firstMono =
ReactiveSecurityContextHolder.getContext()
.publishOn(Schedulers.boundedElastic()) //<-- this allows .block() somewhere downstream
.subscribeOn(Schedulers.boundedElastic())
.flatMap(securityContext -> Mono.just((org.springframework.security.oauth2.jwt.Jwt) securityContext.getAuthentication().getPrincipal()));
Mono<org.springframework.security.oauth2.jwt.Jwt> secondMono = Mono.just(firstMono)
.publishOn(Schedulers.boundedElastic())
.map(it -> {
org.springframework.security.oauth2.jwt.Jwt jwt = it.block();
System.out.println(Thread.currentThread() + "-" + jwt);
return jwt;
});
return secondMono;
}
So endpoint method fails with error:
java.lang.NullPointerException: The mapper [....my.Controller$$Lambda$2012/0x0000000800b68840] returned a null value
But if I write
@GetMapping("/test")
public Mono<org.springframework.security.oauth2.jwt.Jwt> test() throws ExecutionException, InterruptedException {
Mono<org.springframework.security.oauth2.jwt.Jwt> firstMono =
ReactiveSecurityContextHolder.getContext()
.map(securityContext ->(org.springframework.security.oauth2.jwt.Jwt) securityContext.getAuthentication().getPrincipal());
return firstMono;
}
everything is OK and response contains JWT. Looks like it is because of spring webflux context magic.
This is a rephrase and clarification of my answer from another post.
When you block in reactor, it means the thread calling block()
is locked, waiting for the block operation to end.
One important fact is that the blocked thread is the calling thread, not one of the thread of the publisher. It might look like an obvious statement, but I've been bitten by this error in the past, and many other users also (this is the subject of your question, for example).
As block is not part of the reactive pipeline, when you do this:
public static void main(String[] args) {
Mono.just(1000)
.publishOn(Schedulers.single())
.flatMap(delay -> Mono.delay(Duration.ofMillis(delay)) // <1>
.block(); // <2>
}
And when you do this:
public static void main(String[] args) {
var publisher = Mono.just(1000)
.subscribeOn(Schedulers.single())
.flatMap(delay -> Mono.delay(Duration.ofMillis(delay));
Mono.just("whatever")
.publishOn(Schedulers.boundedElastic())
.map(it -> publisher.block()) // <1>
.block(); // <2>
}
To discourage its usage, Reactor enforce context verification. When block()
is called, it checks what is the calling thread, and if it consider the calling thread comes from a blocking incompatible scheduler, it will raise an error, to prevent locking the thread.
Most of Reactor and Webflux schedulers are incompatible with block()
, because they are designed for high throughput using minimal resource.
Therefore, when you return a publisher containing block operations in webflux, most of the time it will be executed in a scheduler that do not accept it, and you end up with the error you describe.
First and foremost, try to avoid blocking. Verify if this is avoidable. Otherwise:
Scheduler blockingCompatibleScheduler = Schedulers.boundedElastic();
Mono<T> toBlock = Mono...
Mono<T> wrapper = Mono.fromCallable(() -> toBlock.block())
.subscribeOn(blockingCompatibleScheduler);
// or
Mono<T> wrapper = Mono.just("any")
.publishOn(blockingCompatibleScheduler)
.map(it -> toBlock.block());
More details available in dedicated section of the official docUntil now, I was refering to the execution context. But there is a second kind: the state context.
Since Reactor 3.1, a Context / ContextView API is provided to share contextual information across chained subscriptions, from downstream to upstream.
The official documentation contains an dedicated section about this mechanism, to provide in-depth explanation (this is quite complex).
Due to its nature, block
operator prevent this mechanism to work.
As a context propagate information from a downstream subscription to an inner/upstream one, it cannot provide context information to block
operator: it use a hidden subscription, disconnected from the parent/downstream subscription/pipeline. Said otherwise, a flux/mono cannot access an inner blocked publisher, and therefore cannot propagate context in it.
We can test it in a simplified example:
import reactor.core.publisher.Mono;
public class BlockBreaksContext {
static final Object CTX_KEY = new Object();
/**
* Add "Hello" message to the provided action context, then run/block it and print
* output value.
*/
static void execute(Mono<String> action) {
String value = Mono.from(action)
.contextWrite(ctx -> ctx.put(CTX_KEY, "Hello"))
.block();
System.out.println(value);
}
public static void main(String[] args) {
Mono<String> getContextValue = Mono.deferContextual(ctx
-> Mono.just(ctx.getOrDefault(CTX_KEY, "No value from context")));
// Without blocking, the mono will receive the context written by downstream subscription
execute(getContextValue.map(it -> "NO BLOCKING: " + it));
// With blocking, the Mono is **not** part of the "main" pipeline/subscription.
// Therefore, it cannot receive context on execution, because block cause an
// independent/isolated subscription, whose main chain of event is unaware of.
Mono wrapBlock = Mono.fromCallable(() -> getContextValue.block());
execute(wrapBlock.map(it -> "BLOCKING: " + it));
}
}
This program prints:
NO BLOCKING: Hello
BLOCKING: No value from context
In your question, you try to access a security token. As the token is resolved upon client request, Webflux put it in the response publisher context. When you block, you disassociate your blocked publisher from the response publisher.