I am specifically using twitter's AsyncStream and I need to take the results of concurrent processing, and make it into a Seq, but the only way I've managed to get working is horrendous. This feels like it should be a one liner, but all my attempts with Await and force have either hung or not processed the work.
Here's what I have working - what's a more idiomatic way of doing this?
def processWork(work: AsyncStream[Work]): Seq[Result] = {
// TODO: make less stupid
val resultStream = work.flatMap { processWork }
var results : Seq[Result] = Nil
resultStream.foreach {
result => {
results = results :+ result
}
}
results
}
Like @MattFowler pointed out - you can force the stream and wait until it's complete using:
Await.result(resultStream.toSeq, 1.second)
toSeq
will start realizing the stream and return a Future[Seq[A]]
that completes once all the element are resolved, as documentated here.
You can then block until the future completes by using Await.result
.
Make sure your stream is finite! Calling Await.result(resultStream.toSeq, 1.second)
would hang forever.