reactive-programmingproject-reactorfluxmulticasting

Why is Flux.share() not sharing its subscription?


I want to share (i.e. split) my flux, but share() doesn't seem to cause my subscription to be shared. Why?

I have a Flux emitted by an expensive database call. I want to split that flux and process the values it produces in different ways (but not use the groupBy() operator). Later, I want to combine the different paths again, so that I have to only subscribe once (e.g. via REST controller):

    static Flux<String> expensiveDatabaseCall() { // simulates the DB call and shares the result
        return 
            Flux.generate( 
                () -> {
                    System.out.println("subscribed"); // should only happen once
                    return 0;
                },
                (state, sink) -> {
                    sink.next(state);
                    return state + 1;
                }
            )
            .map(String::valueOf)
            .log()
            .delayElements(Duration.ofSeconds(1))
            .take(2)
            .share() // share the flux so the DB is only queried once
        ;
    }

    static Flux<String> pathA() {
        // complicated calculations
        return expensiveDatabaseCall().doOnNext(it -> System.out.println("a: " + it));
    }

    static Flux<String> pathB() {
        // different, equally complicated calculations
        return expensiveDatabaseCall().doOnNext(it -> System.out.println("b: " + it));
    }

    static Flux<String> controller() { // pretend this happens in a REST controller
        return Flux.merge(pathA(), pathB());
    }

    @Test
    void test() {
        StepVerifier.create(controller()).expectNextCount(4).verifyComplete();
    }

As I am using the share() operator, I was expecting to only see one subscription - but in fact I see two "subscribed". Why?

Isn't the share() operator supposed to subscribe upstream and to handle all downstream subscriptions itself, instead of passing them back to the upstream source?

That's at least how I understood the docs; here's what they say:

Returns a new Flux that multicasts (shares) the original Flux. [...]


Solution

  • The explanation is very simple: any caching/sharing behavior is bound to the flux instance. It means that when you do this :

    public Flux<Integer> sharedCountdown() {
        return Flux.just(3, 2, 1, 0).share();
    }
    
    var instanceA = sharedCountdown();
    var instanceB = sharedCountdown();
    

    You create two distinct flux that will each have their own subscription and cache. That is exactly the same way as any other plain java object.

    If you want sharing, you have to use the same instance in downstream processing.

    In your case, you have to reverse the logic. Your post-processors should not call/create expensive flux themselves, they should receive an assembled instance as input. Also, if you want to ensure all your downstream processors receive all upstream signals, you should avoid share(), and instead use publish().autoconnect(numberOfPostProcessors):

        static Flux<String> expensiveDatabaseCall() { // simulates the DB call and shares the result
            return 
                Flux.generate( 
                    () -> {
                       System.out.println("subscribed"); // should only happen once
                       return 0;
                    },
                    (state, sink) -> {
                       sink.next(state);
                       return state + 1;
                    }
                )
                .map(String::valueOf)
                .log()
                .delayElements(Duration.ofSeconds(1))
                .take(2)
            ;
        }
    
        static Flux<String> pathA(Flux<String> upstreamFlux) {
            // complicated calculations
            return upstreamFlux.doOnNext(it -> System.out.println("a: " + it));
        }
    
        static Flux<String> pathB(Flux<String> upstreamFlux) {
            // different, equally complicated calculations
            return upstreamFlux.doOnNext(it -> System.out.println("b: " + it));
        }
    
        static Flux<String> controller() { // pretend this happens in a REST controller
            var sharedExpensiveUpstream = expensiveDatabaseCall().publish().autoconnect(2);
            return Flux.merge(pathA(sharedExpensiveUpstream), pathB(sharedExpensiveUpstream));
        }