scalaakka-streamakka-http

Chain Akka-http-client requests in a Stream


I would like to chain http request using akka-http-client as Stream. Each http request in a chain depends on a success/response of a previous requests and uses it to construct a new request. If a request is not successful, the Stream should return the response of the unsuccessful request.

How can I construct such a stream in akka-http? which akka-http client level API should I use?


Solution

  • If you're making a web crawler, have a look at this post. This answer tackles a more simple case, such as downloading paginated resources, where the link to the next page is in a header of the current page response.

    You can create a chained source - where one item leads to the next - using the Source.unfoldAsync method. This takes a function which takes an element S and returns Future[Option[(S, E)]] to determine if the stream should continue emitting elements of type E, passing the state to the next invocation.

    In your case, this is kind of like:

    1. taking an initial HttpRequest
    2. producing a Future[HttpResponse]
    3. if the response points to another URL, returning Some(request -> response), otherwise None

    However, there's a wrinkle, which is that this will not emit a response from the stream if it doesn't contain a pointer to the next request.

    To get around this, you can make the function passed to unfoldAsync return Future[Option[(Option[HttpRequest], HttpResponse)]]. This allows you to handle the following situations:

    What follows is some annotated code which outlines this approach, but first a preliminary:

    When streaming HTTP requests to responses in Akka streams, you need to ensure that the response body is consumed otherwise bad things will happen (deadlocks and the like.) If you don't need the body you can ignore it, but here we use a function to convert the HttpEntity from a (potential) stream into a strict entity:

    import scala.concurrent.duration._
    
    def convertToStrict(r: HttpResponse): Future[HttpResponse] =
      r.entity.toStrict(10.minutes).map(e => r.withEntity(e))
    

    Next, a couple of functions to create an Option[HttpRequest] from an HttpResponse. This example uses a scheme like Github's pagination links, where the Links header contains, e.g: <https://api.github.com/...> rel="next":

    def nextUri(r: HttpResponse): Seq[Uri] = for {
      linkHeader <- r.header[Link].toSeq
      value <- linkHeader.values
      params <- value.params if params.key == "rel" && params.value() == "next"
    } yield value.uri
    
    def getNextRequest(r: HttpResponse): Option[HttpRequest] =
      nextUri(r).headOption.map(next => HttpRequest(HttpMethods.GET, next))
    

    Next, the real function we'll pass to unfoldAsync. It uses the Akka HTTP Http().singleRequest() API to take an HttpRequest and produce a Future[HttpResponse]:

    def chainRequests(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] =
      reqOption match {
        case Some(req) => Http().singleRequest(req).flatMap { response =>
          // handle the error case. Here we just return the errored response
          // with no next item.
          if (response.status.isFailure()) Future.successful(Some(None -> response))
    
          // Otherwise, convert the response to a strict response by
          // taking up the body and looking for a next request.
          else convertToStrict(response).map { strictResponse =>
            getNextRequest(strictResponse) match {
              // If we have no next request, return Some containing an
              // empty state, but the current value
              case None => Some(None -> strictResponse)
    
              // Otherwise, pass on the request...
              case next => Some(next -> strictResponse)
            }
          }
        }
        // Finally, there's no next request, end the stream by
        // returning none as the state.
        case None => Future.successful(None)
      }
    

    Note that if we get an errored response, the stream will not continue since we return None in the next state.

    You can invoke this to get a stream of HttpResponse objects like so:

    val initialRequest = HttpRequest(HttpMethods.GET, "http://www.my-url.com")
    Source.unfoldAsync[Option[HttpRequest], HttpResponse](
        Some(initialRequest)(chainRequests)
    

    As for returning the value of the last (or errored) response, you simply need to use Sink.last, since the stream will end either when it completes successfully or on the first errored response. For example:

    def getStatus: Future[StatusCode] = Source.unfoldAsync[Option[HttpRequest], HttpResponse](
          Some(initialRequest))(chainRequests)
        .map(_.status)
        .runWith(Sink.last)