androidrx-javarx-java2

RxJava Subscribing to many observables does not trigger onNext() for all subscribers?


When I create 5 observables and subscribe to each of them with separate subscriber, intuitively I thought that each subscriber would get its observables' corresponding data, emitted via onNext() call:

val compositeSubscription = CompositeDisposable()

fun test() {

        for (i in 0..5) {
            compositeSubscription.add (Observable.create<String>(object : ObservableOnSubscribe<String> {
                override fun subscribe(emitter: ObservableEmitter<String>) {
                    emitter.onNext("somestring")
                    emitter.onComplete()
                }
            }).subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                    .subscribe({
                        Logger.i("testIt onNext")
                    }, {
                        Logger.i("testIt onError")
                    }))
        }
}

However, what I see is one or two "testIt onNext" in the log.

Now, when I add the delay in subscribers' onNext(), all 6 subscribers onNext() are getting called.

This seems like some racy condition, when some of the subscribers are not fast enough to catch up on their data. Just how this happens evades me, as subscribe() should be called after Subscriber is up and running.

Would be grateful for any tips on this.


Solution

  • Judging from this code every subscriber should print "testIt onNext". Are you sure it is not getting printed? Maybe Android Studio is collapsing identical lines? Have you tried printing something different for each subscriber?