Given a queue like so:
val queue: Queue[Int] = async.boundedQueue[Int](1000)
I want to pull off this queue and stream it into a downstream Sink, in chunks of UP to 100.
queue.dequeue.chunk(100).to(downstreamConsumer)
works sort of, but it will not empty the queue if I have say 101 messages. There will be 1 message left over, unless another 99 are pushed in. I want to take as many as I can off the queue up to 100, as fast as my downstream process can handle.
Is there an existing combinator available?
I actually solved this a different way then I had intended.
The scalaz-stream queue now contains a dequeueBatch
method that allows dequeuing all values in the queue, up to N, or blocks.