scalaobservablereplayrx-scala

RxScala Observables with replay


I'm trying to understand replay in RxScala. I create an observable like this:

lazy val toyObservable : Observable[Int] = {
    val coldObservable : Observable[Int] = intPerSecond
    val hotObservable : ConnectableObservable[Int] = coldObservable.publish
    val cachedObservable = hotObservable //.replay(3)   //<<<<<<<<< ODD THING 
    cachedObservable.connect
    cachedObservable
}

where intPerSecond shoots out one integer per second, starting at 0. The first observer to subscribe indeed sees one integer per second. If the second observer joins in at t=6 seconds, then from that point they both see a matching stream 6...7...8...9... at one second intervals. That's as expected.

Now if I add in the .replay(3) I'd expect that when the second observer joins, he'd see 3456...7...8...9, ie he'd immediately get 3 integers from the cache, then receive them at one per second as they were produced. But instead, neither observer now sees anything. Do I have the syntax wrong?


Solution

  • You forget to call hotObservable.connect. The following codes output exactly what you want:

    import rx.lang.scala._
    import rx.lang.scala.observables._
    import scala.concurrent.duration._
    val intPerSecond = Observable.interval(1.seconds).map(_.toInt)
    val coldObservable : Observable[Int] = intPerSecond
    val hotObservable : ConnectableObservable[Int] = coldObservable.publish
    val cachedObservable = hotObservable.replay(3)
    cachedObservable.connect
    hotObservable.connect
    cachedObservable.foreach(i => println(s"1: $i"))
    Thread.sleep(6000)
    cachedObservable.foreach(i => println(s"2: $i"))