I'm using project reactor and I have a very long flow in which I get an exception (when parsing a string to json with Jackson). The thing is that even though I use
.map(this::parser)
.onErrorResume(err -> {
log.error(myMsg);
return Mono.empty();
})
.flatMap(writeToPulsar)
.subscribe()
The flow won't continue. I do see the error log and the flow doesn't throw an exception, but the flow won't continue to get executed. Any reason for this to happen?
When I change the code to the (unwanted) .onErrorContinue()
, the data pipeline won't get stopped:
.map(this::parser)
.onErrorContinue((err, msg) -> {
log.error(myMsg);
})
.flatMap(writeToPulsar)
.subscribe()
As a part of the error handling you are returning Mono.empty()
and it means your flow will be completed without emitting any result and flatMap
will not be executed.
Not sure about the expected behavior but if you want to continue the flow - return some "default" value from onErrorResume
instead or use switchIfEmpty
operator to provide another publisher.