This is my test code in Kotlin:
fun main() {
rxjava()
}
fun rxjava() {
val queuSubject = PublishSubject.create<String>()
queuSubject
.map { t ->
val a = t.toLong()
Thread.sleep(6000 / a)
println("map $a called ${Thread.currentThread().name} ")
a
}
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe({
println("thread in subscription ${Thread.currentThread().name}")
}, {
println("error ${it.message}")
})
for (i in 1..3) {
Thread {
queuSubject.onNext("$i")
}.start()
}
Thread.sleep(15000)
}
I'm trying to run map
block and subscribe's onNext
block in different IO threads. But the output is like this:
map 3 called Thread-2
thread in subscription RxCachedThreadScheduler-2
map 2 called Thread-1
thread in subscription RxCachedThreadScheduler-2
map 1 called Thread-0
thread in subscription RxCachedThreadScheduler-2
As you can see It seems that calling subscribeOn
has no effect on PublishSubject's
stream and thread-0,thread-1 and thread-2
refers to the threads that call onNext
methods.
Additionally consider the code below:
fun main() {
rxjava()
}
fun rxjava() {
val queuSubject = PublishSubject.create<String>()
queuSubject
.map { t ->
val a = t.toLong()
Thread.sleep(6000 / a)
println("map $a called ${Thread.currentThread().name} ")
a
}
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe({
println("thread in subscription ${Thread.currentThread().name}")
}, {
println("error ${it.message}")
})
queuSubject.onNext("1")
queuSubject.onNext("2")
queuSubject.onNext("3")
Thread.sleep(15000)
}
I wrote the code above and saw that no output is printed. But If I remove subscribeOn
from the stream, messages are printed sequentially like the following:
map 1 called main
thread in subscription RxCachedThreadScheduler-1
map 2 called main
thread in subscription RxCachedThreadScheduler-1
map 3 called main
thread in subscription RxCachedThreadScheduler-1
What is the problem of these codes? Thanks.
Because subscribeOn
only affects subscription side-effects of a source. Such side-effect would be if the source starts emitting events right when an observer subscribes:
Observable.just(1, 2, 3)
.subscribeOn(Schedulers.io())
.doOnNext(v -> System.out.println(Thread.currentThread() + " - " + v)
.blockingSubscribe();
PublishSubject
has no subscription side-effect as it only relays signals from its onXXX
method to the observers' onXXX
methods.
However, subscribeOn
has a time effect as it delays the actual subscription to the source, thus in case of PublishSubject
, it might not see a registered observer in time some other thread calls its onXXX
methods.
If you want to move the processing off of the original thread, use observeOn
:
val queuSubject = PublishSubject.create<String>()
queuSubject
.observeOn(Schedulers.io()) // <----------------------------------------
.map { t ->
val a = t.toLong()
Thread.sleep(6000 / a)
println("map $a called ${Thread.currentThread().name} ")
a
}
.observeOn(Schedulers.io())
.subscribe({
println("thread in subscription ${Thread.currentThread().name}")
}, {
println("error ${it.message}")
})