I want to share (i.e. split) my flux, but share()
doesn't seem to cause my subscription to be shared. Why?
I have a Flux
emitted by an expensive database call. I want to split that flux and process the values it produces in different ways (but not use the groupBy()
operator). Later, I want to combine the different paths again, so that I have to only subscribe once (e.g. via REST controller):
static Flux<String> expensiveDatabaseCall() { // simulates the DB call and shares the result
return
Flux.generate(
() -> {
System.out.println("subscribed"); // should only happen once
return 0;
},
(state, sink) -> {
sink.next(state);
return state + 1;
}
)
.map(String::valueOf)
.log()
.delayElements(Duration.ofSeconds(1))
.take(2)
.share() // share the flux so the DB is only queried once
;
}
static Flux<String> pathA() {
// complicated calculations
return expensiveDatabaseCall().doOnNext(it -> System.out.println("a: " + it));
}
static Flux<String> pathB() {
// different, equally complicated calculations
return expensiveDatabaseCall().doOnNext(it -> System.out.println("b: " + it));
}
static Flux<String> controller() { // pretend this happens in a REST controller
return Flux.merge(pathA(), pathB());
}
@Test
void test() {
StepVerifier.create(controller()).expectNextCount(4).verifyComplete();
}
As I am using the share()
operator, I was expecting to only see one subscription - but in fact I see two "subscribed"
. Why?
Isn't the share()
operator supposed to subscribe upstream and to handle all downstream subscriptions itself, instead of passing them back to the upstream source?
That's at least how I understood the docs; here's what they say:
Returns a new Flux that multicasts (shares) the original Flux. [...]
The explanation is very simple: any caching/sharing behavior is bound to the flux instance. It means that when you do this :
public Flux<Integer> sharedCountdown() {
return Flux.just(3, 2, 1, 0).share();
}
var instanceA = sharedCountdown();
var instanceB = sharedCountdown();
You create two distinct flux that will each have their own subscription and cache. That is exactly the same way as any other plain java object.
If you want sharing, you have to use the same instance in downstream processing.
In your case, you have to reverse the logic. Your post-processors should not call/create expensive flux themselves, they should receive an assembled instance as input. Also, if you want to ensure all your downstream processors receive all upstream signals, you should avoid share()
, and instead use publish().autoconnect(numberOfPostProcessors):
static Flux<String> expensiveDatabaseCall() { // simulates the DB call and shares the result
return
Flux.generate(
() -> {
System.out.println("subscribed"); // should only happen once
return 0;
},
(state, sink) -> {
sink.next(state);
return state + 1;
}
)
.map(String::valueOf)
.log()
.delayElements(Duration.ofSeconds(1))
.take(2)
;
}
static Flux<String> pathA(Flux<String> upstreamFlux) {
// complicated calculations
return upstreamFlux.doOnNext(it -> System.out.println("a: " + it));
}
static Flux<String> pathB(Flux<String> upstreamFlux) {
// different, equally complicated calculations
return upstreamFlux.doOnNext(it -> System.out.println("b: " + it));
}
static Flux<String> controller() { // pretend this happens in a REST controller
var sharedExpensiveUpstream = expensiveDatabaseCall().publish().autoconnect(2);
return Flux.merge(pathA(sharedExpensiveUpstream), pathB(sharedExpensiveUpstream));
}