project-reactorspring-reactor

Project Reactor + flatMap + Multiple onErrorComplete - Not working as expected


When multiple onErrorContinue added to the pipeline to handle specific type of exception thrown from flatMap, the exception handling is not working as expected.

The below code, I expect, the elements 1 to 6 should be dropped and element 7 to 10 should be consumed by the subscriber.

public class FlatMapOnErrorContinueExample {
    public static void main(String[] args) {
        Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                .flatMap(number -> {
                    if (number <= 3) {
                        return Mono.error(new NumberLesserThanThree("Number is lesser than 3"));
                    } else if (number > 3 && number <= 6) {
                        return Mono.error(new NumberLesserThanSixButGretherThan3("Number is grether than 6"));
                    } else {
                        return Mono.just(number);
                    }
                })
                .onErrorContinue(NumberLesserThanThree.class,
                        (throwable, object) -> System.err.println("Exception: Dropping the element because it is lesser than 3"))

                .onErrorContinue(NumberLesserThanSixButGretherThan3.class,
                        (throwable, object) -> System.err.println("Exception: Dropping the element because it is lesser than 6 but grether than 3"))

                .onErrorContinue((throwable, object) ->
                        System.err.println("Exception: " + throwable.getMessage()))

                .subscribe(number -> System.out.println("number is " + number),
                        error -> System.err.println("Exception in Subscription " + error.getMessage()));
    }

    public static class NumberLesserThanThree extends RuntimeException {
        public NumberLesserThanThree(final String msg) {
            super(msg);
        }
    }

    public static class NumberLesserThanSixButGretherThan3 extends RuntimeException {
        public NumberLesserThanSixButGretherThan3(final String msg) {
            super(msg);
        }
    }
}

Here is the output what I am getting:

Exception: Dropping the element because it is lesser than 3
Exception: Dropping the element because it is lesser than 3
Exception: Dropping the element because it is lesser than 3
Exception in Subscription Number is grether than 6

Question: Why the 2nd onErrorContinue is not called but the exception send to subscriber?

Additional Note: if i remove 1st and 2nd onErrorContinue, then all exception are handled by 3rd onErrorContinue. I could use this approach to receive all exception and check for the type of exception and proceed with handling. However, I would like to make it cleaner exception handling rather than adding if..else block.

How this question is different from Why does Thread.sleep() trigger the subscription to Flux.interval()?

1) This question about exception handling and the order of exception handling; The other question is about processing elements in parallel and making the main thread waiting for the all the element processing complete 3) This question dont have any concern about threading, even if add Thread.sleep(10000) after . subscribe, there is no change in behaviour.


Solution

  • This again comes down to the unusual behaviour of onErrorContinue. It breaks the rule in that it doesn't "catch" errors and then change the behaviour downstream as a result, it actually allows supporting operators to "look ahead" at it and behave accordingly, thus changing the result upstream.

    This is weird, and leads to some behaviour that's not immediately obvious, such as is the case here. As far as I'm aware, all supporting operators only look ahead to the next onErrorContinue operator, rather than recursively searching ahead for all such operators. Instead, they will evaluate the predicate of the next onErrorContinue (in this case whether it's of a certain type), and then behave accordingly - either invoking the handler if the predicate returns true, or throwing the error downstream if not. (There's no case where it will then move onto the next onErrorContinue operator, then the next, until a predicate is matched.)

    Clearly this is a contrived example - but because of these idiosyncrasies, I'd almost always recommend avoiding onErrorContinue. There's two "normal" ways that can happen where flatMap() is involved:

    1. If flatMap() has an "inner reactive chain" in it, that is it calls another method or series of methods that return a publisher - then just use onErrorResume() at the end of the flatMap() call to handle those errors. You can chain onErrorResume() since it works with downstream, not upstream operators. This is by far the most common case.

    2. If flatMap() is an imperative collection of if / else that's returning different publishers such as it is here and you want to / have to keep the imperative style, throw exceptions instead of using Mono.error(), and catch as appropriate, returning Mono.empty() in case of an error:

        .flatMap(number -> {
            try {
                if (number <= 3) {
                    throw new NumberLessThanThree();
                } else if (number <= 6) {
                    throw new NumberLessThanSixButGreaterThan3();
                } else {
                    return Mono.just(number);
                }
            }
            catch(NumberLessThanThree ex) {
                //Handle it
                return Mono.empty();
            }
            catch(NumberLessThanSixButGreaterThan3 ex) {
                //As above
            }
        })
    

    In general, using one of these two approaches will make it much easier to reason about what's going on.

    (For the sake of completeness after reading the comments - this isn't anything to do with the reactive chain being unable to complete before the main thread exits.)