scalaakkahttp-streamingakka-streamakka-http

akka-http chunked response concatenation


I'm using akka-http to make a request to a http service which sends back chunked response. This is how the relevant bit of code looks like:

val httpRequest: HttpRequest = //build the request
val request = Http().singleRequest(httpRequest)
request.flatMap { response =>
    response.entity.dataBytes.runForeach { chunk =>
        println("-----")
        println(chunk.utf8String)
    }
}

and the output produced in the command line looks something like this:

-----
{"data":
-----
"some text"}

-----
{"data":
-----
"this is a longer
-----
text"}

-----
{"data": "txt"}

-----
...

The logical piece of data - a json in this case ends with an end of line symbol \r\n, but the problem is, that the json doesn't always fit in a single http response chunk as clearly visible in the example above.

My question is - how do I concatenate the incoming chunked data into full jsons so that the resulting container type would still remain either Source[Out,M1] or Flow[In,Out,M2]? I'd like to follow the idealogy of akka-stream.

UPDATE: It's worth mentioning also, that the response is endless and the aggregation must be done in real time


Solution

  • Found a solution:

    val request: HttpRequest = //build the request
    request.flatMap { response =>
        response.entity.dataBytes.scan("")((acc, curr) => if (acc.contains("\r\n")) curr.utf8String else acc + curr.utf8String)
            .filter(_.contains("\r\n"))
            .runForeach { json =>
                println("-----")
                println(json)
            }
    }