scalastreamscalazscalaz-stream

Splitting a scalaz-stream process into two child streams


Using scalaz-stream is it possible to split/fork and then rejoin a stream?

As an example, let's say I have the following function

val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10)

val sumOfEvenNumbers = streamOfNumbers.filter(isEven).fold(0)(add)
val sumOfOddNumbers  = streamOfNumbers.filter(isOdd).fold(0)(add)

zip( sumOfEven, sumOfOdd ).to( someEffectfulFunction )

With scalaz-stream, in this example the results would be as you expect - a tuple of numbers from 1 to 10 passed to a sink.

However if we replace streamOfNumbers with something that requires IO, it will actually execute the IO action twice.

Using a Topic I'm able create a pub/sub process that duplicates elements in the stream correctly, however it does not buffer - it simply consumers the entire source as fast as possible regardless of the pace sinks consume it.

I can wrap this in a bounded Queue, however the end result feels a lot more complex than it needs to be.

Is there a simpler way of splitting a stream in scalaz-stream without duplicate IO actions from the source?


Solution

  • Also to clarify the previous answer delas with the "splitting" requirement. The solution to your specific issue may be without the need of splitting streams:

    val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10)
    val oddOrEven: Process[Task,Int\/Int] = streamOfNumbers.map {
       case even if even % 2 == 0 => right(even)
       case odd => left(odd)
    } 
    val summed = oddOrEven.pipeW(sump1).pipeO(sump1)
    
    val evenSink: Sink[Task,Int] = ???
    val oddSink: Sink[Task,Int] = ???
    
    summed
    .drainW(evenSink)
    .to(oddSink)