rx-scala

Controlling observable buffering by observable itself


I'm trying to slice observable stream by itself, eg.:

val source = Observable.from(1 to 10).share
val boundaries = source.filter(_ % 3 == 0)
val result = source.tumblingBuffer(boundaries)

result.subscribe((buf) => println(buf.toString))

Te output is:

Buffer()
Buffer()
Buffer()
Buffer()

source is probably iterated on boundaries line, before it reaches the result so it only create boundaries and resulting buffers but there's nothing to fill in.

My approach to this is using publish/connect:

val source2 = Observable.from(1 to 10).publish
val boundaries2 = source2.filter(_ % 3 == 0)
val result2 = source2.tumblingBuffer(boundaries2)

result2.subscribe((buf) => println(buf.toString))
source2.connect

This produces output alright:

Buffer(1, 2)
Buffer(3, 4, 5)
Buffer(6, 7, 8)
Buffer(9, 10)

Now I just need to hide connect from outer world and connect it when result gets subscribed (I am doing this inside a class and I don't want to expose it). Something like:

val source3 = Observable.from(1 to 10).publish
val boundaries3 = source3.filter(_ % 3 == 0)
val result3 = source3
          .tumblingBuffer(boundaries3)
          .doOnSubscribe(() => source3.connect)

result3.subscribe((buf) => println(buf.toString))

But now, doOnSubscribe action gets never called so published source gets never connected...

What's wrong?


Solution

  • You were on the right track with your publish solution. There is however an alternative publish operator that takes a lambda as its argument (see documentation) of type Observable[T] => Observable[R]. The argument of this lambda is the original stream, to which you can safely subscribe multiple times. Within the lambda you transform the original stream to your liking; in your case you filter the stream and buffer it on that filter.

    Observable.from(1 to 10)
        .publish(src => src.tumblingBuffer(src.filter(_ % 3 == 0)))
        .subscribe(buf => println(buf.toString()))
    

    The best thing of this operator is that you don't need to call anything like connect afterwards.