scalascalaz-stream

scalaz-stream queue without hanging


I have a two-part question, so let me give some background first. I know that is possible to do something similar to what I want like this:

import scalaz.concurrent._
import scalaz.stream._

val q = async.unboundedQueue[Int]
val p: Process[Task, Int] = q.dequeue

q.enqueueAll(1 to 2).run

val p1: Process1[Int, Int] = process1.take(1)

p.pipe(p1).map(x => println(s"Answer: $x")).run.run
// Answer: 1

p.pipe(p1).map(x => println(s"Answer: $x")).run.run
// Answer: 2

p.pipe(p1).map(x => println(s"Answer: $x")).run.run
// hangs awaiting next input

Is there some other p1 that I could use that would give me the output below without hanging (it would be like process1.awaitOption)?

Answer: Some(1)
Answer: Some(2)
Answer: None

If yes, I think it would be easy to answer the next question. Is there some other p1 that I could use that would give me the output below without hanging (it would be like process1.chunkAll)?

Answer: Seq(1, 2)
Answer: Seq()
Answer: Seq()

Edit:

To complement the question to make it more understandable. If I have a loop like this:

for (i <- 1 to 4) {
  p.pipe(p1).map(x => println(s"Answer: $x")).run.run
}

The result could be:

Answer: Seq()
// if someone pushes some values into the queue, like: q.enqueueAll(1 to 2).run
Answer: Seq(1, 2)
Answer: Seq()
Answer: Seq()

I hope it's clear now what I am trying to do. The problem is that I don't have control of the loop and I must not block it if there's no values in the queue.


Solution

  • Although I couldn't make Pavel's answer work the way I wanted, it was the turning point and I could use his advice to use the size signal.

    I'm posting my answer here in case anyone need it:

    import scalaz.concurrent._
    import scalaz.stream._
    
    val q = async.unboundedQueue[Int]
    val p: Process[Task, Int] = q.size.continuous.take(1).flatMap { n => q.dequeue |> process1.take(n) }
    
    q.enqueueAll(1 to 2).run
    
    p.map(x => println(s"Answer: $x")).run.run
    // Answer: 1
    // Answer: 2
    
    p.map(x => println(s"Answer: $x")).run.run
    // not hanging awaiting next input
    
    p.map(x => println(s"Answer: $x")).run.run
    // not hanging awaiting next input
    
    q.enqueueAll(1 to 2).run
    
    p.map(x => println(s"Answer: $x")).run.run
    // Answer: 1
    // Answer: 2
    

    I realize that it's not exactly answering the question since I don't have an explicit p1, but it's fine for my purposes.