javaproject-reactorreactive-streams

Transforming a Connectable Flux into Hot via share operator not working


I have below program which first creates an interval flux, taking 5 elements and subscribe to it.
Post that,I convert it to connectable flux using replay operator with auto connect 2 and then convert it to hot publisher. However,any subequent subsription doesn't receive any data irrespective of any number of subscriber I add. (sleep is used here to show effect of hot stream)

        Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(5);
        flux.subscribe();
        flux = flux.replay().autoConnect(2).share();   //hot publisher     
        flux.subscribe(aLong -> System.out.println("first " + aLong));   //no data
        sleep(2000);
        flux.subscribe(aLong -> System.out.println("second " + aLong));  //no data

The catching thing is that, if I set auto connect to 1 (effectively making it a normal flux), then expected behaviour is observed (Output shown below).

Output:
first 0
first 1
first 2
second 2
first 3
second 3
first 4
second 4

Please clarify or correct me, if I am wrong.


Solution

  •  Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(2);
        flux.subscribe(aLong -> System.out.println("source " + aLong));
        flux = flux.replay().autoConnect(2).share();   //hot publisher
        flux.subscribe(aLong -> System.out.println("first " + aLong));   
        sleep(2000);
        flux.subscribe(aLong -> System.out.println("second " + aLong));
    

    output : source 0 source 1 (here only source printed because, we are calling share() after subscribe)

     Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(2);
        flux.replay().autoConnect(3).share().
            subscribe(aLong -> System.out.println("source " + aLong));   
        flux.subscribe(aLong -> System.out.println("first " + aLong));
        sleep(2000);
        flux.subscribe(aLong -> System.out.println("second " + aLong));
    

    output : first 0 first 1 second 0 second 1 (here source not printed )

    Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(2).replay().autoConnect(3);
        flux.share().subscribe(aLong -> System.out.println("source " + aLong));  
        flux.subscribe(aLong -> System.out.println("first " + aLong));
        sleep(2000);
        flux.subscribe(aLong -> System.out.println("second " + aLong));
    

    output : source 0 first 0 second 0 source 1 first 1 second 1

    Observe these 3 scenarios and its outputs,its important here where and when you are subscribed and shared