javaobservablerx-javareactive-programmingrx-java3

Are subscribers notified after all events or per event?


I am new into RxJava and I was under the impression that for each event each subscriber is being notified. So if we have N subscribers and a stream of X events the onNext for each of the N subscribers would be called. But when I run the following code:

public static void main(String[] args) {
        Observable<String> source = Observable.create(emitter -> {
            emitter.onNext("Hello");
            emitter.onNext("Foo");
            emitter.onNext("Bar");
            emitter.onNext("RxJava");
        });

        source.subscribe(e -> System.out.println("Observer 1: " + e));
        source.subscribe(e -> System.out.println("Observer 2: " + e));
    } 

I see:

Observer 1: Hello
Observer 1: Foo
Observer 1: Bar
Observer 1: RxJava
Observer 2: Hello
Observer 2: Foo
Observer 2: Bar
Observer 2: RxJava  

So basically after all the onNext are done only then the next observer is being triggered.

I was expecting to see:

Observer 1: Hello 
Observer 2: Hello
Observer 1: Foo
Observer 2: Foo
Observer 1: Bar
Observer 2: Bar
Observer 1: RxJava
Observer 2: RxJava 

That seems to me inefficient for very long streams, am I doing something wrong?


Solution

  • RxJava sequences are synchronous by default thus the subscribe call above will run your emission code right there. To achieve the interleaving, you need a way to tell the source when both consumers are ready to receive. This can be done several ways:

    ConnectableObservable<String> source = Observable.<String>create(emitter -> {
                emitter.onNext("Hello");
                emitter.onNext("Foo");
                emitter.onNext("Bar");
                emitter.onNext("RxJava");
            }).publish();
    
            source.subscribe(e -> System.out.println("Observer 1: " + e));
            source.subscribe(e -> System.out.println("Observer 2: " + e));
    
            source.connect();
    

    or

    ConnectableObservable<String> source = Observable.<String>create(emitter -> {
                emitter.onNext("Hello");
                emitter.onNext("Foo");
                emitter.onNext("Bar");
                emitter.onNext("RxJava");
            }).publish().refCount(2);
    
            source.subscribe(e -> System.out.println("Observer 1: " + e));
            source.subscribe(e -> System.out.println("Observer 2: " + e));