I'm trying to understand replay
in RxScala. I create an observable like this:
lazy val toyObservable : Observable[Int] = {
val coldObservable : Observable[Int] = intPerSecond
val hotObservable : ConnectableObservable[Int] = coldObservable.publish
val cachedObservable = hotObservable //.replay(3) //<<<<<<<<< ODD THING
cachedObservable.connect
cachedObservable
}
where intPerSecond
shoots out one integer per second, starting at 0. The first observer to subscribe indeed sees one integer per second. If the second observer joins in at t=6 seconds, then from that point they both see a matching stream 6...7...8...9... at one second intervals. That's as expected.
Now if I add in the .replay(3)
I'd expect that when the second observer joins, he'd see 3456...7...8...9, ie he'd immediately get 3 integers from the cache, then receive them at one per second as they were produced. But instead, neither observer now sees anything. Do I have the syntax wrong?
You forget to call hotObservable.connect
. The following codes output exactly what you want:
import rx.lang.scala._
import rx.lang.scala.observables._
import scala.concurrent.duration._
val intPerSecond = Observable.interval(1.seconds).map(_.toInt)
val coldObservable : Observable[Int] = intPerSecond
val hotObservable : ConnectableObservable[Int] = coldObservable.publish
val cachedObservable = hotObservable.replay(3)
cachedObservable.connect
hotObservable.connect
cachedObservable.foreach(i => println(s"1: $i"))
Thread.sleep(6000)
cachedObservable.foreach(i => println(s"2: $i"))