I'm using Spring Reactor with Spring Cloud Stream (GCP Pub/Sub Binder) and running into error handling issues. I'm able to reproduce the issue with a very simple example:
@Bean
public Function<Flux<String>, Mono<Void>> consumer() {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}", msg))
.map(msg -> {
if (true) {
throw new RuntimeException("exception encountered!");
}
return msg;
})
.doOnError(throwable -> log.error("Failed to consume message", throwable))
.then();
}
The behavior I expect is to see "Failed to consume message" print, however, that's not what appears to happen. When adding a .log()
call to the chain I see onNext
/onComplete
signals, I would expect to see onError
signals.
My actual code looks something like this:
@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}", msg))
.flatMap(myService::processMessage) // exception happens deep in here
.doOnError(throwable -> log.error("Failed to consume message", throwable))
.then();
}
I noticed that deep in my service class I was attempting to do error handling on my Reactor publishers. However, the onError
signal wouldn't occur when using Spring Cloud Stream. If I simply invoked my service as such myService.processMessage(msg)
in a unit test and mocked the exception, my reactive chain would propagate error signals correctly.
It seems to be an issue when I hook in to Spring Cloud Stream. I'm wondering if Spring Cloud Function/Stream is doing any global error wrapping?
In my non-trivial code I do notice this error message that may have something to do with why I'm not getting error signals?
ERROR --- onfiguration$FunctionToDestinationBinder : Failed to process the following content which will be dropped: ...
To further my confusion, I am able to get the onError
signal in my reactive chain if I switch my Spring Cloud Stream binding to the non-reactive implementation as so:
@Bean
public Consumer<CustomMessage> consumer(MyService myService) {
return customMessage -> Mono.just(customMessage)
.doOnNext(msg -> log.info("New message received: {}", msg))
.flatMap(myService::processMessage) // exception happens deep in here
.doOnError(throwable -> log.error("Failed to consume message", throwable)) // prints successfully this time
.subscribe();
}
So this is what I've gathered from my own investigations, maybe this might help others. Forewarning, I might not be using the right "Spring Reactor Language" but this is how I ended up solving it...
In Hoxton.SR5
, an onErrorContinue
was included on the reactive binding that managed the flux subscription. The problem with onErrorContinue
is that it affects upstream operators by applying the BiConsumer function at the operator that failed (if supported).
This means that when an error occurred in our map
/flatMap
operators, the onErrorContinue
BiConsumer would kick in and modify the downstream signal to either onComplete()
(Mono<T>
) or request(...)
(if it requested a new element from a Flux<T>
). This resulted in our doOnError(...)
operators not executing since there were no onError()
signals.
Eventually the SCS team decided to remove this error handling wrapper. Hoxton.SR6
no longer has this onErrorContinue
. However, this meant that exceptions propagating up to the SCS binding would result in the Flux subscription being severed. Subsequent messages would then have nowhere to be routed since there were no subscribers.
This error handling has been passed along to the clients, we add an onErrorResume
operator to the inner publisher to effectively drop error signals. When an error is encountered within the myService::processMessage
publisher, onErrorResume
will switch publishers to the fallback publisher that was passed in as a parameter and resume from that point in the operator chain. In our case, this fallback publisher simply returns Mono.empty()
which allows us to drop the error signals while still allowing internal error handling mechanisms to operate while also not affecting the outer source publisher.
onErrorResume
Example/ExplanationThe above technique can be illustrated with a very simple example.
Flux.just(1, 2, 3)
.flatMap(i -> i == 2
? Mono.error(new RuntimeException("error")
: Mono.just(i))
.onErrorResume(t -> Flux.just(4, 5, 6))
.doOnNext(i -> log.info("Element: {}", i))
.subscribe();
The Flux<Integer>
above will output the following:
Element: 1
Element: 4
Element: 5
Element: 6
Since an error is encountered at element 2
, onErrorResume
fallback kicks in and the new publisher becomes Flux.just(4, 5, 6)
effectively resuming from the fallback. In our case, we don't want to affect the source publisher (i.e. Flux.just(1, 2, 3)
). We want to just drop the erroneous element (2
) and continue to the next element (3
).
We can't simply change Flux.just(4, 5, 6)
to Flux.empty()
or Mono.empty()
as so:
Flux.just(1, 2, 3)
.flatMap(i -> i == 2
? Mono.error(new RuntimeException("error")
: Mono.just(i))
.onErrorResume(t -> Mono.empty())
.doOnNext(i -> log.info("Element: {}", i))
.subscribe();
This would cause the following to be output:
Element: 1
This is because onErrorResume
has replaced the upstream publishers with the fallback publisher (i.e. Mono.empty()
) and resumed from that point on.
To achieve our desired output of:
Element: 1
Element: 3
We must place the onErrorResume
operator on the inner publisher of the flatMap
:
public Mono<Integer> func(int i) {
return i = 2 ? Mono.error(new RuntimeException("error")) : Mono.just(i);
}
Flux.just(1, 2, 3)
.flatMap(i -> func(i)
onErrorResume(t -> Mono.empty()))
.doOnNext(i -> log.info("Element: {}", i))
.subscribe();
Now, the onErrorResume
only effects the inner publisher returned by func(i)
. If an error occurs from operators in func(i)
, onErrorResume
will fallback to Mono.empty()
effectively completing the Mono<T>
without blowing up. This also still allows error handling operators (e.g. doOnError
) within func(i)
to be applied before the fallback runs. This is because, unlike onErrorContinue
, it does not affect upstream operators and change the next signal at the location of the error.
Reusing the code-snippet in my question, I've upgraded my Spring Cloud version to Hoxton.SR6
and changed the code to something like this:
@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}", msg))
.flatMap(msg -> myService.processMessage(msg)
.onErrorResume(throwable -> Mono.empty())
)
.then();
}
Note that the onErrorResume
is on the inner publisher (inside the flatMap
).