When the gather2
returns false
. downstream.push
in gather1 - finisher is false
. That is right.
But why does downstream.isRejecting
is false
always? it should be true
as downstream is rejecting. The problem happens only when we chain 2 gatherers.
Stream.of(0)
.gather(gather1())
.gather(gather2())
.forEach(System.out::println);
static Gatherer<Integer, ?, Integer> gather1() {
return Gatherer.ofSequential(
Gatherer.Integrator.ofGreedy((state, element, downstream) -> downstream.push(element)),
(unused, downstream) -> {
for (int i = 1; i <= 10 && !downstream.isRejecting(); i++) {
System.out.println("finisher pushing " + i + ", push result " + downstream.push(i) + ", isRejecting " + downstream.isRejecting());
}
}
);
}
static Gatherer<Integer, ?, Integer> gather2() {
return Gatherer.ofSequential(
Gatherer.Integrator.of(
(state, element, downstream) -> false
)
);
}
isRejecting
operates on a best-effort basis.
This is best-effort only, once this returns true it should never return false again for the same instance.
So technically it is always "correct" for it to return false
.
Going through the implementation, we can see that the argument passed to the downstream parameter in a Composite
gatherer is just a lambda expression (source). For example, in the finisher implementation,
void finish(Downstream<? super RR> c) {
if (leftFinisher != Gatherer.<A, R>defaultFinisher())
leftFinisher.accept(leftState, r -> rightIntegrate(r, c));
if (rightFinisher != Gatherer.<AA, RR>defaultFinisher())
rightFinisher.accept(rightState, c);
}
r -> rightIntegrate(r, c)
is the downstream your gather1
receives. This obviously doesn't implement isRejected
at all. The default implementation is used, which returns false.
Looking further down, there is a comment addressing this
Currently we use the following to ferry elements from the left
Gatherer
to the rightGatherer
, but we create theGatherer.Downstream
as a lambda which means that the default implementation ofisKnownDone()
is used.If it is determined that we want to be able to support the full interface of
Gatherer.Downstream
then we have the following options:
- Have
State
implementDownstream<? super R>
and store the passed inDownstream<? super RR> downstream
as an instance field inintegrate()
and read it inpush(R r)
.- Allocate a new
Gatherer.Downstream<? super R>
for each invocation ofintegrate()
which might prove costly.
So basically, isRejecting
always returns false for downstream in a gatherer chain. A "better" implementation is not supported yet.