scalatwitter-util

Forcing an AsyncStream[A] into a Seq[A] in scala


I am specifically using twitter's AsyncStream and I need to take the results of concurrent processing, and make it into a Seq, but the only way I've managed to get working is horrendous. This feels like it should be a one liner, but all my attempts with Await and force have either hung or not processed the work.

Here's what I have working - what's a more idiomatic way of doing this?

  def processWork(work: AsyncStream[Work]): Seq[Result] = {
    // TODO: make less stupid
    val resultStream = work.flatMap { processWork }
    var results : Seq[Result] = Nil
    resultStream.foreach {
      result => {
        results = results :+ result
      }
    }
    results
  }

Solution

  • Like @MattFowler pointed out - you can force the stream and wait until it's complete using:

    Await.result(resultStream.toSeq, 1.second)
    

    toSeq will start realizing the stream and return a Future[Seq[A]] that completes once all the element are resolved, as documentated here.

    You can then block until the future completes by using Await.result.

    Make sure your stream is finite! Calling Await.result(resultStream.toSeq, 1.second) would hang forever.