javakotlinrx-java3

Why doesn't subscribeOn effect on PublishSubject in Rxjava?


This is my test code in Kotlin:

fun main() {
    rxjava()
}

fun rxjava() {
    val queuSubject = PublishSubject.create<String>()
    queuSubject
        .map { t ->
            val a = t.toLong()
            Thread.sleep(6000 / a)
            println("map $a called ${Thread.currentThread().name} ")
            a
        }
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.io())
        .subscribe({
            println("thread in subscription ${Thread.currentThread().name}")
        }, {
            println("error ${it.message}")
        })
    for (i in 1..3) {
        Thread {
            queuSubject.onNext("$i")
        }.start()
    }
    Thread.sleep(15000)
}

I'm trying to run map block and subscribe's onNext block in different IO threads. But the output is like this:

map 3 called Thread-2 
thread in subscription RxCachedThreadScheduler-2
map 2 called Thread-1 
thread in subscription RxCachedThreadScheduler-2
map 1 called Thread-0 
thread in subscription RxCachedThreadScheduler-2

As you can see It seems that calling subscribeOn has no effect on PublishSubject's stream and thread-0,thread-1 and thread-2 refers to the threads that call onNext methods.

Additionally consider the code below:

fun main() {
    rxjava()
}

fun rxjava() {
    val queuSubject = PublishSubject.create<String>()
    queuSubject
        .map { t ->
            val a = t.toLong()
            Thread.sleep(6000 / a)
            println("map $a called ${Thread.currentThread().name} ")
            a
        }
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.io())
        .subscribe({
            println("thread in subscription ${Thread.currentThread().name}")
        }, {
            println("error ${it.message}")
        })
    queuSubject.onNext("1")
    queuSubject.onNext("2")
    queuSubject.onNext("3")
    Thread.sleep(15000)
}

I wrote the code above and saw that no output is printed. But If I remove subscribeOn from the stream, messages are printed sequentially like the following:

map 1 called main 
thread in subscription RxCachedThreadScheduler-1
map 2 called main 
thread in subscription RxCachedThreadScheduler-1
map 3 called main 
thread in subscription RxCachedThreadScheduler-1

What is the problem of these codes? Thanks.


Solution

  • Because subscribeOn only affects subscription side-effects of a source. Such side-effect would be if the source starts emitting events right when an observer subscribes:

    Observable.just(1, 2, 3)
    .subscribeOn(Schedulers.io())
    .doOnNext(v -> System.out.println(Thread.currentThread() + " - " + v)
    .blockingSubscribe();
    

    PublishSubject has no subscription side-effect as it only relays signals from its onXXX method to the observers' onXXX methods.

    However, subscribeOn has a time effect as it delays the actual subscription to the source, thus in case of PublishSubject, it might not see a registered observer in time some other thread calls its onXXX methods.

    If you want to move the processing off of the original thread, use observeOn:

    val queuSubject = PublishSubject.create<String>()
        queuSubject
            .observeOn(Schedulers.io()) // <----------------------------------------
            .map { t ->
                val a = t.toLong()
                Thread.sleep(6000 / a)
                println("map $a called ${Thread.currentThread().name} ")
                a
            }
            .observeOn(Schedulers.io())
            .subscribe({
                println("thread in subscription ${Thread.currentThread().name}")
            }, {
                println("error ${it.message}")
            })