I'm trying to configure the behavior of a Monix Observable
to meet a particular set of requirements.
Basically I'm trying to get it so:
.refCount
)Observable
restarts again (Like a cold observable)Here's my code:
import monix.reactive.Observable
import scala.concurrent.duration._
import monix.execution.Scheduler.Implicits.global
object Main {
def main(args: Array[String]): Unit = {
val observable = Observable.interval(1.second).map{i => println("Obs running"); i}.share
val subscription1 = observable.foreach(n => println(s"sub1: $n"))
Thread.sleep(2000)
val subscription2 = observable.foreach(n => println(s"sub2: $n"))
Thread.sleep(5000)
subscription1.cancel()
println("Cancel 1")
Thread.sleep(3000)
subscription2.cancel()
println("Cancel 2")
// All subscribers unsubscribed, observable stops emitting values and is disposed of
println("Sleep")
Thread.sleep(5000)
// After some time, a new subscriber comes along
val subscription3 = observable.foreach(n => println(s"sub3: $n"))
println("Subscribed to 3")
Thread.sleep(7000)
}
}
SBT:
scalaVersion := "3.4.0"
libraryDependencies += "io.monix" %% "monix-reactive" % "3.4.1"
Here's the output of my script:
Obs running
sub1: 0
Obs running
sub1: 1
Obs running
sub1: 2
Obs running
sub1: 3
sub2: 3
Obs running
sub1: 4
sub2: 4
Obs running
sub1: 5
sub2: 5
Obs running
sub1: 6
sub2: 6
Obs running
sub1: 7
sub2: 7
Cancel 1
Obs running
sub2: 8
Obs running
sub2: 9
Cancel 2
Sleep
Subscribed to 3
By using .share
I was able to get the observable to be multicast and to stop firing once sub1 and sub2 unsubscribe (because .share
uses .refCount
under the hood), but when sub3 subscribes it never receives any items.
Also note this is a simplified example. In my real use case the Observable
isn't just a list of ints, but it's running a query on an external system and returning the results back, but the issue is essentially the same.
Would greatly appreciate any help on this. Thanks!
After an intense jam session with Claude Opus, we were able to come up with a solution:
import monix.execution.atomic.{Atomic, AtomicInt}
import monix.execution.{Ack, Cancelable, Scheduler}
import monix.reactive.Observable
import monix.reactive.observers.Subscriber
import monix.reactive.subjects.PublishSubject
import scala.concurrent.Future
class RestartableRefCountObservable[A] private (source: Observable[A]) extends Observable[A] {
private var sourceConnection: Cancelable = null
private val subject: PublishSubject[A] = PublishSubject()
private val connectionCount = Atomic(0)
override def unsafeSubscribeFn(subscriber: Subscriber[A]): Cancelable = {
def startConnection(): Unit = {
sourceConnection = source.subscribe(new Subscriber[A] {
override def onNext(elem: A): Future[Ack] = subject.onNext(elem)
override def onError(ex: Throwable): Unit = {
sourceConnection.cancel()
subject.onError(ex)
}
override def onComplete(): Unit = {
sourceConnection.cancel()
subject.onComplete()
}
override implicit def scheduler: Scheduler = subscriber.scheduler
})
}
if (connectionCount.incrementAndGet() == 1) startConnection()
val subjectSubscription = subject.unsafeSubscribeFn(subscriber)
Cancelable { () =>
subjectSubscription.cancel()
if (connectionCount.decrementAndGet() == 0) sourceConnection.cancel()
}
}
}
object RestartableRefCountObservable {
def apply[A](source: Observable[A]): RestartableRefCountObservable[A] =
new RestartableRefCountObservable(source)
}
The solution was to implement a new class that extends Observable
and that internally keeps track of the number of subscribers it has. Upon the first connection it will connect to the source Observable and after all subscribers have disconnected it will close the connection to the source. The crucial difference between monix.reactive.observables.RefCountObservable
and this is that this will restart if another subscriber subscribes after all previous subscribers have unsubscribed.
Here's an updated version of the script I was using to test before:
import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
import scala.concurrent.duration.*
object Main {
def main(args: Array[String]): Unit = {
val source = Observable.interval(1.second).map(i => {
println(s"Obs running: $i")
i
})
val restartable = RestartableRefCountObservable(source)
println("Sub1")
val sub1 = restartable.foreach(i => println(s"Sub1: $i"))
Thread.sleep(3000)
println("Sub2")
val sub2 = restartable.foreach(i => println(s"Sub2: $i"))
Thread.sleep(5000)
sub1.cancel()
println("Sub1 canceled")
Thread.sleep(3000)
sub2.cancel()
println("Sub2 canceled")
Thread.sleep(10000)
println("Sub3")
val sub3 = restartable.foreach(i => println(s"Sub3: $i"))
Thread.sleep(15000)
}
}
and here's the new output:
Sub1
Obs running: 0
Sub1: 0
Obs running: 1
Sub1: 1
Obs running: 2
Sub1: 2
Obs running: 3
Sub1: 3
Sub2
Sub2: 3
Obs running: 4
Sub1: 4
Sub2: 4
Obs running: 5
Sub1: 5
Sub2: 5
Obs running: 6
Sub1: 6
Sub2: 6
Obs running: 7
Sub1: 7
Sub2: 7
Obs running: 8
Sub1: 8
Sub2: 8
Sub1 canceled
Obs running: 9
Sub2: 9
Obs running: 10
Sub2: 10
Obs running: 11
Sub2: 11
Sub2 canceled
Sub3
Obs running: 0
Sub3: 0
Obs running: 1
Sub3: 1
Obs running: 2
Sub3: 2
Obs running: 3
Sub3: 3
Obs running: 4
Sub3: 4
Obs running: 5
Sub3: 5
Obs running: 6
Sub3: 6
Obs running: 7
Sub3: 7
Obs running: 8
Sub3: 8
Obs running: 9
Sub3: 9
Obs running: 10
Sub3: 10
Obs running: 11
Sub3: 11
Obs running: 12
Sub3: 12
Obs running: 13
Sub3: 13
Obs running: 14
Sub3: 14