scalaobservablerx-javareactive-programmingmonix

Reactive streams (monix) operator combination for buffering for a timespan with overlapping elements


I have an Observable[T] which emits continuously and I want an Observable[List[T]] which emits the last elements in a specified duration for each element the source emits. Example

Observable. range(0, 100)   
    .delayExecution(1.second)
    // -- add here some operator combination which takes the parameter of 3 seconds

//should output:
// t0: List(0)
// t+1s: List(0,1)
// t+2s: List(0,1,2)
// t+3s: List(1,2,3)
// t+4s: List(2,3,4)

Note that the emmited List contains the last accumulated elements in the specified duration, and is emitted on every source item. The bufferTimed operator does not emit on every source item.

I am thinking of implemting an operator similar to monix.reactive.internal.operators.BufferTimedObservable, but with the logic described above, but I don't want to put in that effort if there is an easier way


Solution

  • Your own solution is perfectly fine, but I wanted to propose a minor improvement:

    import monix.reactive._
    import scala.concurrent.duration._
    import scala.collection.mutable
    import scala.jdk.DurationConverters.JavaDurationOps
    import java.time
    
    def tailTimed[A](
        source: Observable[A],
        timespan: FiniteDuration,
    ): Observable[Seq[A]] = {
      source
        .scan(mutable.ArrayDeque.empty[(time.Instant, A)])((deque, next) => {
          val timestamp = time.Instant.now()
          deque.dropWhileInPlace {
            case (t, _) =>
              time.Duration.between(t, timestamp).toScala > timespan
          }
          deque += timestamp -> next
        })
        .map(_.view.map(_._2).toIndexedSeq)
    }
    

    It is mostly equivalent, but internally uses an ArrayDeque to have fast access to the two ends of the queue. It takes advantage of the fact that older elements are always at the start to avoid scanning the whole collection and new elements can be added to it efficiently.

    Out of curiosity, I also implemented this as its own Observable:

    import monix.execution.Ack.Continue
    import monix.execution.{Ack, Cancelable, Scheduler}
    import monix.reactive.Observable
    import monix.reactive.observers.Subscriber
    
    import java.time.Instant
    import scala.collection.mutable
    import scala.concurrent.Future
    import scala.concurrent.duration.{Duration, FiniteDuration}
    import scala.jdk.DurationConverters.ScalaDurationOps
    
    private final class TailTimedObservable[+A](
        source: Observable[A],
        timespan: FiniteDuration,
    ) extends Observable[Seq[A]] {
    
      require(timespan > Duration.Zero, "timespan must be strictly positive")
    
      def unsafeSubscribeFn(out: Subscriber[Seq[A]]): Cancelable = {
        source.unsafeSubscribeFn(new Subscriber[A] {
          implicit val scheduler: Scheduler = out.scheduler
    
          private[this] val deque = mutable.ArrayDeque.empty[(Instant, A)]
    
          override def onNext(elem: A): Future[Ack] = {
            val tElem = java.time.Instant.now()
            deque.dropWhileInPlace {
              case (t, _) =>
                java.time.Duration
                  .between(t, tElem)
                  .minus(timespan.toJava)
                  .isPositive
            }
            deque += ((tElem, elem))
            out.onNext(deque.view.map(_._2).toIndexedSeq)
            Continue
          }
    
          override def onError(ex: Throwable): Unit = out.onError(ex)
    
          override def onComplete(): Unit = out.onComplete()
        })
      }
    }
    

    Since this does not require a periodic action, this is a lot simpler than BufferTimedObservable.

    It should be noted that there's a lot of optimization that can be potentially done here (mainly avoiding copying the deque before emitting it to the consumer), as well as testability improvements (e.g. avoiding calling Instant.now() and accepting a Clock as a parameter instead).

    While the earlier implementation is probably more than good for most use cases, if these latter possible improvements are meaningful to you, then the custom Observable probably puts you in a better position to achieve them.