I'm trying to do implement paging using Akka Streams. Currently I have
case class SomeObject(id:Long, next_page:Option[Map[String,String]])
def chainRequests(uri: Uri): Future[Option[(Uri, T)]] = {
if (uri.isEmpty) return Future.successful(None)
val response: Future[Response[T]] = sendWithRetry(prepareRequest(HttpMethods.GET, uri)).flatMap(unmarshal)
response.map { resp =>
resp.next_page match {
case Some(next_page) => Some(next_page("uri"), resp.data)
case _ => Some(Uri.Empty, resp.data)
}
}
}
Source.single(SomeObject).map(Uri(s"object/${_.id}")).map(uri => Source.unfoldAsync(url)(chainRequest)).map(...some processing goes here)
The problem is that if I do source.take(1000) and paging has a lot of elements(pages) than downstream does not gets new elements until Source.unfoldAsync finishes.
I was trying to use cycles in Flows like
val in = builder.add(Flow[Uri])
val out = builder.add[Flow[T]]
val partition = b.add(Partition[Response[T]](2,r => r.next_page match {case Some(_)=>1; case None => 0}))
val merge = b.add(Merge[Response[T]],2)
in ~> mergeUri ~> sendRequest ~> partition
mergeUri.preferred <~ extractNextUri <~ partition.out(1)
partition.out(0) ~> Flow[Response[T]].map(_.data) ~> out
FlowShape(in.in, out.out)
But above code does not work.
I'm stuck with creating my own GraphStage. UnfoldAsync takes first element, but with Flow solution I don't have "first" element. Any suggestions?
Thanks
Found the solution with writing my own GraphStage
final class PaginationGraphStage[S <: Uri, E](f: S => Future[Option[(S, E)]])(implicit ec: ExecutionContextExecutor)
extends GraphStage[FlowShape[S, E]]{
val in: Inlet[S] = Inlet[S]("PaginationGraphStage.in")
val out: Outlet[E] = Outlet[E]("PaginationGraphStage.out")
override val shape: FlowShape[S, E] = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler with InHandler {
private[this] var state: S = _
private[this] var inFlight = 0
private[this] var asyncFinished = false
private[this] def todo: Int = inFlight
def futureCompleted(result: Try[Option[(Uri, E)]]): Unit = {
inFlight -= 1
result match {
case Failure(ex) => fail(out, ex)
case Success(None) =>
asyncFinished = true
complete(out)
case Success(Some((newS: S, elem: E))) if !newS.isEmpty =>
push(out, elem)
state = newS
case Success(Some((newS: Uri, elem: E))) =>
push(out, elem)
asyncFinished = true
if (isAvailable(in)) getHandler(in).onPush()
else completeStage()
}
}
private val futureCB = getAsyncCallback(futureCompleted)
private val invokeFutureCB: Try[Option[(S, E)]] => Unit = futureCB.invoke
private def pullIfNeeded(): Unit = {
if (!hasBeenPulled(in)) tryPull(in)
}
override def onUpstreamFinish(): Unit = {
if (todo == 0) completeStage()
}
def onPull(): Unit = {
if (state != null) {
asyncFinished = false
inFlight += 1
val future = f(state)
future.value match {
case None => future.onComplete(invokeFutureCB)
case Some(v) => futureCompleted(v)
}
} else {
pullIfNeeded()
}
}
override def onPush(): Unit = {
if (state == null) {
inFlight += 1
state = grab(in)
pullIfNeeded()
getHandler(out).onPull()
}
if (asyncFinished) {
inFlight += 1
state = grab(in)
pullIfNeeded()
}
}
setHandlers(in, out, this)
}
}