Here's what I found in the Reactive Streams specification
Calling onSubscribe, onNext, onError or onComplete MUST return normally except when any provided parameter is null in which case it MUST throw a java.lang.NullPointerException to the caller, for all other situations the only legal way for a Subscriber to signal failure is by cancelling its Subscription.
I checked it
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
public class GenericTest {
@Test
void test() {
Mono.just("some-string")
.map(this::makeNull)
.map(String::toUpperCase) // here it should throw
.subscribe(System.out::println);
}
private String makeNull(String string) {
return null;
}
}
[main] ERROR reactor.core.publisher.Operators -- Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NullPointerException: The mapper [com.example.dynamicgateway.misc.GenericTest$$Lambda$377/0x0000000800cd8af8] returned a null value.
Caused by: java.lang.NullPointerException: The mapper [com.example.dynamicgateway.misc.GenericTest$$Lambda$377/0x0000000800cd8af8] returned a null value.
It looks like an exception but actually it's just a log message
Is Project Reactor a bona fide Reactive Streams implementation? Or did it find some loophole in the spec and can legally avoid throwing NPEs on null arguments under certain circumstances?
Reactor does throw a NullPointerException when trying to use null values.
The reason why the exception is not launched in your test method is because subscribe
triggers the flow asynchronously. The error is not raised in the method, but in a background worker. Therefore, it is only logged by the thread that run the pipeline.
In the context of tests, you should use reactor-test
module and its StepVerifier
class, that listens to the events on the pipeline to test.
We can write your example as follow:
@Test
public void mapNullShouldThrowError() {
var myMono = Mono.just("test")
.<String>map(it -> null)
.map(String::toUpperCase);
StepVerifier.create(myMono)
.expectError(NullPointerException.class)
.verify();
}
You can also test this assertion by using block()
instead of subscribe
. Although it should be avoided if possible, in this case you can try it: block will cause the error to be propagated in the test method.