The Reactor error handling documentation (https://projectreactor.io/docs/core/3.4.10/reference/index.html#error.handling) states that error-handling operators do not let the original sequence continue.
Before you learn about error-handling operators, you must keep in mind that any error in a reactive sequence is a terminal event. Even if an error-handling operator is used, it does not let the original sequence continue. Rather, it converts the onError signal into the start of a new sequence (the fallback one). In other words, it replaces the terminated sequence upstream of it.
But the javadoc for onErrorContinue states the following (https://projectreactor.io/docs/core/3.4.10/api/index.html) -
Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements.
Is onErrorContinue not considered an "error-handling operator"?
It does seem to allow the original sequence to continue -
Flux.range(1, 5)
.map(i -> {
if (i == 3) {
throw new RuntimeException("Forcing exception for " + i);
}
return i;
})
.doOnNext(i -> System.out.println(i))
.onErrorContinue((throwable, o) -> System.err.println("Error while processing " + o + " - " + throwable.getMessage()))
.subscribe();
Result (Dropped 3 but continued with subsequent elements)
1
2
4
5
Error while processing 3 - Forcing exception for 3
Process finished with exit code 0
The documentation does state that onErrorContinue is dependent on operator support. Is there any other way to let the original sequence (source Flux) continue that works for all operators? I dont want an alternate flux to replace my source flux in case of errors (the onErrorResume behaviour) - I just want to ignore the problem element & continue with the source flux.
EDIT 1 (My use case)
I have a reactor kafka source flux & i want to consume from it infinitely regardless of errors. I was using onErrorContinue but based on feedback received on this post, i have replaced it with onErrorResume. Below is the code i have at this point but i am not sure whether it will work in all cases (by "work", i am streaming continuously from kafka regardless of any errors). Any suggestions please?
KafkaReceiver.create(receiverOptions)
.receive()
.flatMap(record -> processRequest(record.value())
.doOnNext(e -> record.receiverOffset().acknowledge())
.doOnError(e -> {
System.err.println("Error occurred for msg: " + record.value() + ", Error " + e);
record.receiverOffset().acknowledge();
})
.onErrorResume(e -> Mono.empty()))
.repeat(() -> true)
.retryWhen(Retry.indefinitely())
.doFinally(signalType -> {
//dont expect control to ever reach here
System.err.println("KafkaReceiverFlux terminating with Signal type: " + signalType);
})
.subscribe();
The reactive streams specification, which reactor follows, states that all errors in a stream are terminal events - and this is what the reactor error handling documentation builds on. In order to handle an error, an error must have occurred, and that error must be terminal according to the spec. In all specification-compliant cases (which is nearly all total cases) this is true.
onErrorContinue()
is, however, a rather special type of operator. It is an error-handling operator, but one that breaks the reactive specification by allowing the error to be dropped, and the stream to continue. It can be potentially useful in cases where you want continuous processing, to never stop, with an error side-channel.
That being said, it has a bunch of problems - not just that it requires specific operator support (because operators fully compliant with the reactive streams specification may completely disregard onErrorContinue()
while still remaining compliant), but also a whole bunch of other issues. A few of us discussed these here if you're interested in some background reading. In future it's possible that it'll get moved to an unsafe()
grouping or similar, but it's a very difficult problem to solve.
That being said, the core advice is that that's in the Javadoc at the moment, that being not to use onErrorContinue()
in all but very specific cases, and instead using onErrorResume()
on each individual publisher as so:
//Stream
.flatMap(id -> repository.retrieveById(id)
.doOnError(System.err::println)
.onErrorResume(e -> Mono.empty()))
This introduces a greater verbosity and possibly a small performance penalty (not something I've verified), but has the advantage of being far clearer in its behaviour, not breaking the reactive streams spec, and not requiring specific operator support to work. It's what I'd recommend in pretty much every case - I personally feel the subtleties of onErrorContinue()
are too complicated to reason about in most cases.