scalarx-javaobservablerx-scala

RxScala subscribe with multi Observer just emit event to first one


I attempt use multi Observer subscribe a Observable which onNext occurred within a loop.It seems not work for every Observer.

import rx.lang.scala.Observable

object SubscribeMultiEvent extends App{
  val obv = Observable.apply[String]{ s =>
    def printForever: Unit = {
      s.onNext("hi~")
      Thread.sleep(1000)
      printForever
    }
    printForever
  }

  obv.subscribe(s => println(s"first observer - $s"))
  obv.subscribe(s => println(s"second observer - $s"))

  Thread.currentThread().join()
}

Response just for the first Observer

first observer - hi~
first observer - hi~
...

Why second one can't receive the subscribe? Thanks


Solution

  • The problem in your codes is your Observable is synchronous. It means the second subscribe won't run until the first subscribe finishes. And since your Observable is never completed, the second subscribe won't be able to run.

    To fix this issue, you need to make your Observable asynchronous. You can use subscribeOn to run in another Thread. E.g.,

    import rx.lang.scala.Observable
    import rx.lang.scala.schedulers.NewThreadScheduler
    
    object SubscribeMultiEvent extends App{
      val obv = Observable.apply[String]{ s =>
        def printForever: Unit = {
          s.onNext("hi~")
          Thread.sleep(1000)
          printForever
        }
        printForever
      }.subscribeOn(NewThreadScheduler())
    
      obv.subscribe(s => println(s"first observer - $s"))
      obv.subscribe(s => println(s"second observer - $s"))
    
      Thread.sleep(60000)
    }
    

    Thread.sleep(60000) at the end is important. RxJava's Threads are daemon by default, and if the main thread finishes, because there is no more non-daemon threads, JVM will exit. To prevent the main thread from stopping, you need to add something like Thread.sleep(60000).