I am new into RxJava and I was under the impression that for each event each subscriber is being notified. So if we have N subscribers and a stream of X events the onNext
for each of the N subscribers would be called. But when I run the following code:
public static void main(String[] args) {
Observable<String> source = Observable.create(emitter -> {
emitter.onNext("Hello");
emitter.onNext("Foo");
emitter.onNext("Bar");
emitter.onNext("RxJava");
});
source.subscribe(e -> System.out.println("Observer 1: " + e));
source.subscribe(e -> System.out.println("Observer 2: " + e));
}
I see:
Observer 1: Hello
Observer 1: Foo
Observer 1: Bar
Observer 1: RxJava
Observer 2: Hello
Observer 2: Foo
Observer 2: Bar
Observer 2: RxJava
So basically after all the onNext
are done only then the next observer is being triggered.
I was expecting to see:
Observer 1: Hello
Observer 2: Hello
Observer 1: Foo
Observer 2: Foo
Observer 1: Bar
Observer 2: Bar
Observer 1: RxJava
Observer 2: RxJava
That seems to me inefficient for very long streams, am I doing something wrong?
RxJava sequences are synchronous by default thus the subscribe call above will run your emission code right there. To achieve the interleaving, you need a way to tell the source when both consumers are ready to receive. This can be done several ways:
ConnectableObservable<String> source = Observable.<String>create(emitter -> {
emitter.onNext("Hello");
emitter.onNext("Foo");
emitter.onNext("Bar");
emitter.onNext("RxJava");
}).publish();
source.subscribe(e -> System.out.println("Observer 1: " + e));
source.subscribe(e -> System.out.println("Observer 2: " + e));
source.connect();
or
ConnectableObservable<String> source = Observable.<String>create(emitter -> {
emitter.onNext("Hello");
emitter.onNext("Foo");
emitter.onNext("Bar");
emitter.onNext("RxJava");
}).publish().refCount(2);
source.subscribe(e -> System.out.println("Observer 1: " + e));
source.subscribe(e -> System.out.println("Observer 2: " + e));