I'd like to implement a Flow to handle paginated results (e.g., underlying service returns some results, but also indicates that more results are available by making another request, passing in e.g. a cursor).
Things I've done so far:
I have implemented the following flow and test, but the flow doesn't complete.
object AdditionalRequestsFlow {
private def keepRequest[Request, Response](flow: Flow[Request, Response, NotUsed]): Flow[Request, (Request, Response), NotUsed] = {
Flow.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = builder.add(Flow[Request])
val bcast = builder.add(Broadcast[Request](2))
val merge = builder.add(Zip[Request, Response]())
in ~> bcast ~> merge.in0
bcast ~> flow ~> merge.in1
FlowShape(in.in, merge.out)
})
}
def flow[Request, Response, Output](
inputFlow: Flow[Request, Response, NotUsed],
anotherRequest: (Request, Response) => Option[Request],
extractOutput: Response => Output,
mergeOutput: (Output, Output) => Output
): Flow[Request, Output, NotUsed] = {
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val start = b.add(Flow[Request])
val merge = b.add(Merge[Request](2))
val underlying = b.add(keepRequest(inputFlow))
val unOption = b.add(Flow[Option[Request]].mapConcat(_.toList))
val unzip = b.add(UnzipWith[(Request, Response), Response, Option[Request]] { case (req, res) =>
(res, anotherRequest(req, res))
})
val finish = b.add(Flow[Response].map(extractOutput)) // this is wrong as we don't keep to 1 Request -> 1 Output, but first let's get the flow to work
start ~> merge ~> underlying ~> unzip.in
unzip.out0 ~> finish
merge <~ unOption <~ unzip.out1
FlowShape(start.in, finish.out)
})
}
}
The test:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import org.scalatest.FlatSpec
import org.scalatest.Matchers._
import cats.syntax.option._
import org.scalatest.concurrent.ScalaFutures.whenReady
class AdditionalRequestsFlowSpec extends FlatSpec {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
case class Request(max: Int, batchSize: Int, offset: Option[Int] = None)
case class Response(values: List[Int], nextOffset: Option[Int])
private val flow: Flow[Request, Response, NotUsed] = {
Flow[Request]
.map { request =>
val start = request.offset.getOrElse(0)
val end = Math.min(request.max, start + request.batchSize)
val nextOffset = if (end == request.max) None else Some(end)
val result = Response((start until end).toList, nextOffset)
result
}
}
"AdditionalRequestsFlow" should "collect additional responses" in {
def anotherRequest(request: Request, response: Response): Option[Request] = {
response.nextOffset.map { nextOffset => request.copy(offset = nextOffset.some) }
}
def extract(x: Response): List[Int] = x.values
def merge(a: List[Int], b: List[Int]): List[Int] = a ::: b
val requests =
Request(max = 35, batchSize = 10) ::
Request(max = 5, batchSize = 10) ::
Request(max = 100, batchSize = 1) ::
Nil
val expected = requests.map { x =>
(0 until x.max).toList
}
val future = Source(requests)
.via(AdditionalRequestsFlow.flow(flow, anotherRequest, extract, merge))
.runWith(Sink.seq)
whenReady(future) { x =>
x shouldEqual expected
}
}
}
Implemented the same flow in a terrible, blocking way to illustrate what I'm trying to achieve:
def uglyHackFlow[Request, Response, Output](
inputFlow: Flow[Request, Response, NotUsed],
anotherRequest: (Request, Response) => Option[Request],
extractOutput: Response => Output,
mergeOutput: (Output, Output) => Output
): Flow[Request, Output, NotUsed] = {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
Flow[Request]
.map { x =>
def grab(request: Request): Output = {
val response = Await.result(Source.single(request).via(inputFlow).runWith(Sink.head), 10.seconds) // :(
val another = anotherRequest(request, response)
val output = extractOutput(response)
another.map { another =>
mergeOutput(output, grab(another))
} getOrElse output
}
grab(x)
}
}
This works (but we should not be materializing anything / Await
-ing at this point).
Reviewed http://doc.akka.io/docs/akka/2.4/scala/stream/stream-graphs.html#Graph_cycles__liveness_and_deadlocks which I believe contains the answer, however I cannot seem to find it there. In my case, I would expect the cycle should contain one element at most times so neither buffer overflow nor complete starvation should occur - but evidently does.
Tried to debug the stream using .withAttributes(Attributes(LogLevels(...)))
however it doesn't result in any output despite seemingly correctly configured loggers.
I'm looking for hints how to fix the flow
method keeping the same signature and semantics (test would pass).
Or perhaps I'm doing something completely off-base here (e.g., there is an existing feature in, say, akka-stream-contrib
which solves this)?
I think it's much safer to use Source.unfold
than to create custom graphs. Here is what I typically do (with minor variations depending on API).
override def getArticles(lastTokenOpt: Option[String], filterIds: (Seq[Id]) => Seq[Id]): Source[Either[String, ImpArticle], NotUsed] = {
val maxRows = 1000
def getUri(cursor: String, count: Int) = s"/works?rows=$count&filter=type:journal-article&order=asc&sort=deposited&cursor=${URLEncoder.encode(cursor, "UTF-8")}"
Source.unfoldAsync(lastTokenOpt.getOrElse("*")) { cursor =>
println(s"Getting ${getUri(cursor, maxRows)}")
if (cursor.nonEmpty) {
sendGetRequest[CrossRefResponse[CrossRefList[JsValue]]](getUri(cursor, maxRows)).map {
case Some(response) =>
response.message match {
case Left(list) if response.status == "ok" =>
println(s"Got ${list.items.length} items")
val items = list.items.flatMap { js =>
try {
parseArticle(js)
} catch {
case ex: Throwable =>
logger.error(s"Error on parsing: ${js.compactPrint}")
throw ex
}
}
list.`next-cursor` match {
case Some(nextCursor) =>
Some(nextCursor -> (items.map(Right.apply).toList ::: List(Left(nextCursor))))
case None =>
logger.error(s"`next-cursor` is missing when fetching from CrossRef [status ${response.status}][${getUri(cursor, maxRows)}]")
Some("" -> items.map(Right.apply).toList)
}
case Left(jsvalue) if response.status != "ok" =>
logger.error(s"API error on fetching data from CrossRef [status ${response.status}][${getUri(cursor, maxRows)}]")
None
case Right(someError) =>
val cause = someError.fold(errors => errors.map(_.message).mkString(", "), ex => ex.message)
logger.error(s"API error on fetching data from CrossRef [status $cause}][${getUri(cursor, maxRows)}]")
None
}
case None =>
logger.error(s"Got error on fetching ${getUri(cursor, maxRows)} from CrossRef")
None
}
} else
Future.successful(None)
}.mapConcat(identity)
}
In your case you probably don't even need to push cursor to the stream. I do that because I store last successful cursor in database to be able to resume later in case of failure.