The Akka documentation on Source.fromIterator
(https://doc.akka.io/docs/akka/current/stream/operators/Source/fromIterator.html) says:
If the iterator perform[s] blocking operations, make sure to run it on a separate dispatcher.
I have an iterator that I want to construct an Akka Source
from. However, it is possible for hasNext
to return true
before the element returned by next
is available. If that happens, then the next
call must block, waiting for the element. Note that hasNext
cannot return false
, since Akka will think the Source
is done and stop processing.
Akka doesn't like waiting for the next element, of course. Timeout exceptions pop up occasionally. So, what do I do in this case? What does it mean to run such an iterator on a separate dispatcher? If I wrap the code in next
to return a Future
(using a separate dispatcher for that), then I get a Source[Future[T]]
instead of Source[T]
, which creates problems downstream, unless I somehow convert it to Future[Source[T]]
.
Any suggestions?
As you note, if you transform the iterator to be an Iterator[Future[T]]
with something on another thread completing the futures, Source.fromIterator
will be a Source[Future[T]]
. However, a Source[Future[T]]
can always be turned into a Source[T]
by attaching a mapAsync(identity)
:
Source.fromIterator(...) // Source[Future[T]]
.mapAsync(identity) // Source[T]
mapAsync
does not block a thread in the stream's dispatcher.
You can also set a stage to run on a specific dispatcher using ActorAttributes.dispatcher
and withAttributes
:
Source.fromIterator(...).withAttributes(ActorAttributes.dispatcher("config.path.to.dispatcher"))
Note that this will introduce an async boundary (with its associated buffer) between the source and the downstream.