I have an Observable[T] which emits continuously and I want an Observable[List[T]] which emits the last elements in a specified duration for each element the source emits. Example
Observable. range(0, 100)
.delayExecution(1.second)
// -- add here some operator combination which takes the parameter of 3 seconds
//should output:
// t0: List(0)
// t+1s: List(0,1)
// t+2s: List(0,1,2)
// t+3s: List(1,2,3)
// t+4s: List(2,3,4)
Note that the emmited List contains the last accumulated elements in the specified duration, and is emitted on every source item. The bufferTimed operator does not emit on every source item.
I am thinking of implemting an operator similar to monix.reactive.internal.operators.BufferTimedObservable, but with the logic described above, but I don't want to put in that effort if there is an easier way
Your own solution is perfectly fine, but I wanted to propose a minor improvement:
import monix.reactive._
import scala.concurrent.duration._
import scala.collection.mutable
import scala.jdk.DurationConverters.JavaDurationOps
import java.time
def tailTimed[A](
source: Observable[A],
timespan: FiniteDuration,
): Observable[Seq[A]] = {
source
.scan(mutable.ArrayDeque.empty[(time.Instant, A)])((deque, next) => {
val timestamp = time.Instant.now()
deque.dropWhileInPlace {
case (t, _) =>
time.Duration.between(t, timestamp).toScala > timespan
}
deque += timestamp -> next
})
.map(_.view.map(_._2).toIndexedSeq)
}
It is mostly equivalent, but internally uses an ArrayDeque
to have fast access to the two ends of the queue. It takes advantage of the fact that older elements are always at the start to avoid scanning the whole collection and new elements can be added to it efficiently.
Out of curiosity, I also implemented this as its own Observable
:
import monix.execution.Ack.Continue
import monix.execution.{Ack, Cancelable, Scheduler}
import monix.reactive.Observable
import monix.reactive.observers.Subscriber
import java.time.Instant
import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.jdk.DurationConverters.ScalaDurationOps
private final class TailTimedObservable[+A](
source: Observable[A],
timespan: FiniteDuration,
) extends Observable[Seq[A]] {
require(timespan > Duration.Zero, "timespan must be strictly positive")
def unsafeSubscribeFn(out: Subscriber[Seq[A]]): Cancelable = {
source.unsafeSubscribeFn(new Subscriber[A] {
implicit val scheduler: Scheduler = out.scheduler
private[this] val deque = mutable.ArrayDeque.empty[(Instant, A)]
override def onNext(elem: A): Future[Ack] = {
val tElem = java.time.Instant.now()
deque.dropWhileInPlace {
case (t, _) =>
java.time.Duration
.between(t, tElem)
.minus(timespan.toJava)
.isPositive
}
deque += ((tElem, elem))
out.onNext(deque.view.map(_._2).toIndexedSeq)
Continue
}
override def onError(ex: Throwable): Unit = out.onError(ex)
override def onComplete(): Unit = out.onComplete()
})
}
}
Since this does not require a periodic action, this is a lot simpler than BufferTimedObservable
.
It should be noted that there's a lot of optimization that can be potentially done here (mainly avoiding copying the deque before emitting it to the consumer), as well as testability improvements (e.g. avoiding calling Instant.now()
and accepting a Clock
as a parameter instead).
While the earlier implementation is probably more than good for most use cases, if these latter possible improvements are meaningful to you, then the custom Observable
probably puts you in a better position to achieve them.