scalaakkaakka-streamalpakka

Akka Source from Iterator with blocking actions


The Akka documentation on Source.fromIterator (https://doc.akka.io/docs/akka/current/stream/operators/Source/fromIterator.html) says:

If the iterator perform[s] blocking operations, make sure to run it on a separate dispatcher.

I have an iterator that I want to construct an Akka Source from. However, it is possible for hasNext to return true before the element returned by next is available. If that happens, then the next call must block, waiting for the element. Note that hasNext cannot return false, since Akka will think the Source is done and stop processing.

Akka doesn't like waiting for the next element, of course. Timeout exceptions pop up occasionally. So, what do I do in this case? What does it mean to run such an iterator on a separate dispatcher? If I wrap the code in next to return a Future (using a separate dispatcher for that), then I get a Source[Future[T]] instead of Source[T], which creates problems downstream, unless I somehow convert it to Future[Source[T]].

Any suggestions?


Solution

  • As you note, if you transform the iterator to be an Iterator[Future[T]] with something on another thread completing the futures, Source.fromIterator will be a Source[Future[T]]. However, a Source[Future[T]] can always be turned into a Source[T] by attaching a mapAsync(identity):

    Source.fromIterator(...)  // Source[Future[T]]
      .mapAsync(identity)     // Source[T]
    

    mapAsync does not block a thread in the stream's dispatcher.

    You can also set a stage to run on a specific dispatcher using ActorAttributes.dispatcher and withAttributes:

    Source.fromIterator(...).withAttributes(ActorAttributes.dispatcher("config.path.to.dispatcher"))
    

    Note that this will introduce an async boundary (with its associated buffer) between the source and the downstream.