I recently implemented a CSV parser in which I used switchOnFirst
to extract the header, which then is passed along in an AtomicReference
.
Here is a simplified version:
Flux<String> lines = Flux.just("HEADER", "line1", "line2", "line3", "line4");
AtomicReference<String> header = new AtomicReference<>();
AtomicInteger count = new AtomicInteger(0);
lines.switchOnFirst((sig, flux) -> {
String item = sig.get();
if (sig.isOnNext() && item != null) {
header.set(item);
return flux.skip(1);
}
return Flux.error(() -> new RuntimeException("No header was found"));
})
.flatMap(line -> Mono.just(header.get() + ":" + line))
.doOnNext(x -> count.incrementAndGet())
.subscribe(
System.out::println,
System.err::println,
() -> System.out.println(count.get() + " rows processed")
);
I am aware that there is Flux.deferContextual(..)
, but I didn't figure out how to read context data from the subscription (to print out "X rows processed").
Is there a best practises for this?
Would the capturing atomic references approach above still be a better fit?
I tried to look for information around switchOnFirst
and Flux.deferContextual(..)
but I haven't found anything that suggests what the best practises is for this particular scenario in which I am trying to:
If you just want a count of the number of items processed, it looks like the count()
operator would work:
...
.flatMap(line -> Mono.just(header.get() + ":" + line))
.count()
.doOnNext(x -> System.out.println(x + " count() rows processed"))
.subscribe(
System.out::println,
System.err::println);