javajava-streamjava-24

Java Gatherer Downstream.isRejecting is inconsistent


When the gather2 returns false. downstream.push in gather1 - finisher is false. That is right.

But why does downstream.isRejecting is false always? it should be true as downstream is rejecting. The problem happens only when we chain 2 gatherers.

   Stream.of(0)
          .gather(gather1())
          .gather(gather2())
          .forEach(System.out::println);

static Gatherer<Integer, ?, Integer> gather1() {
    return Gatherer.ofSequential(
            Gatherer.Integrator.ofGreedy((state, element, downstream) -> downstream.push(element)),
            (unused, downstream) -> {
                for (int i = 1; i <= 10 && !downstream.isRejecting(); i++) {
                    System.out.println("finisher pushing " + i + ", push result " + downstream.push(i) + ", isRejecting " + downstream.isRejecting());
                }
            }
    );
}

static Gatherer<Integer, ?, Integer> gather2() {
    return Gatherer.ofSequential(
            Gatherer.Integrator.of(
                    (state, element, downstream) -> false
            )
    );
}

Solution

  • isRejecting operates on a best-effort basis.

    This is best-effort only, once this returns true it should never return false again for the same instance.

    So technically it is always "correct" for it to return false.

    Going through the implementation, we can see that the argument passed to the downstream parameter in a Composite gatherer is just a lambda expression (source). For example, in the finisher implementation,

    void finish(Downstream<? super RR> c) {
        if (leftFinisher != Gatherer.<A, R>defaultFinisher())
            leftFinisher.accept(leftState, r -> rightIntegrate(r, c));
        if (rightFinisher != Gatherer.<AA, RR>defaultFinisher())
            rightFinisher.accept(rightState, c);
    }
    

    r -> rightIntegrate(r, c) is the downstream your gather1 receives. This obviously doesn't implement isRejected at all. The default implementation is used, which returns false.

    Looking further down, there is a comment addressing this

    Currently we use the following to ferry elements from the left Gatherer to the right Gatherer, but we create the Gatherer.Downstream as a lambda which means that the default implementation of isKnownDone() is used.

    If it is determined that we want to be able to support the full interface of Gatherer.Downstream then we have the following options:

    1. Have State implement Downstream<? super R> and store the passed in Downstream<? super RR> downstream as an instance field in integrate() and read it in push(R r).
    2. Allocate a new Gatherer.Downstream<? super R> for each invocation of integrate() which might prove costly.

    So basically, isRejecting always returns false for downstream in a gatherer chain. A "better" implementation is not supported yet.