I'm trying to slice observable stream by itself, eg.:
val source = Observable.from(1 to 10).share
val boundaries = source.filter(_ % 3 == 0)
val result = source.tumblingBuffer(boundaries)
result.subscribe((buf) => println(buf.toString))
Te output is:
Buffer()
Buffer()
Buffer()
Buffer()
source
is probably iterated on boundaries
line, before it reaches the result
so it only create boundaries and resulting buffers but there's nothing to fill in.
My approach to this is using publish
/connect
:
val source2 = Observable.from(1 to 10).publish
val boundaries2 = source2.filter(_ % 3 == 0)
val result2 = source2.tumblingBuffer(boundaries2)
result2.subscribe((buf) => println(buf.toString))
source2.connect
This produces output alright:
Buffer(1, 2)
Buffer(3, 4, 5)
Buffer(6, 7, 8)
Buffer(9, 10)
Now I just need to hide connect
from outer world and connect
it when result
gets subscribed (I am doing this inside a class and I don't want to expose it). Something like:
val source3 = Observable.from(1 to 10).publish
val boundaries3 = source3.filter(_ % 3 == 0)
val result3 = source3
.tumblingBuffer(boundaries3)
.doOnSubscribe(() => source3.connect)
result3.subscribe((buf) => println(buf.toString))
But now, doOnSubscribe
action gets never called so published source
gets never connected...
What's wrong?
You were on the right track with your publish
solution. There is however an alternative publish
operator that takes a lambda as its argument (see documentation) of type Observable[T] => Observable[R]
. The argument of this lambda is the original stream, to which you can safely subscribe multiple times. Within the lambda you transform the original stream to your liking; in your case you filter the stream and buffer it on that filter.
Observable.from(1 to 10)
.publish(src => src.tumblingBuffer(src.filter(_ % 3 == 0)))
.subscribe(buf => println(buf.toString()))
The best thing of this operator is that you don't need to call anything like connect
afterwards.