I have following Reactive Flux stream in Java,
Flux.just(e1, e2, e3, e4, e5)
.flatMapSequentially(/* process elements */)
.scan(new Accumulator(null, 0), (element, total) -> new Accumulator(element, element.children.size + total))
.takeWhile(/* conditionally process till for example e3, need accumulator value here */)
.map(/* post processing */)
record Accumulator(Element element, int total) {}
in takeWhile
the cancel happens conditionally at element e3
, and once cancelled, the value will not be available in subsequent .map()
, and stream returns with e1, e2
only, But I need e3
element also along with e1, e2
.
How can I preserve e3 conditionally?
You can use takeUntil instead:
This includes the matching data (unlike takeWhile(java.util.function.Predicate<? super T>)).
Example:
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
public class TakeUntilTest {
@Test
public void takeUntilIncludesStopElement() {
var datasStream = Flux.range(1, 10)
// Stop when we encounter value 3
.takeUntil(i -> i.equals(3));
StepVerifier.create(datasStream)
// verify resulting flow include stop value
.expectNext(1, 2, 3)
.verifyComplete();
}
}