I am trying to download a large file from S3 and sending it's data to another actor that is doing an http request and then to persist the response. I want to limit number of requests sent by that actor hence I need to handle backpressure.
I tried doing something like this :
S3.download(bckt, bcktKey).map{
case Some((file, _)) =>
file
.via(CsvParsing.lineScanner())
.map(_.map(_.utf8String)).drop(1)//drop headers
.map(p => Foo(p.head, p(1)))
.mapAsync(30) { p =>
implicit val askTimeout: Timeout = Timeout(10 seconds)
(httpClientActor ? p).mapTo[Buzz]
}
.mapAsync(1){
case b@Buzz(_, _) =>
(persistActor ? b).mapTo[Done]
}.runWith(Sink.head)
The problem is that I see that it reads only 30 lines from file as the limit set for parallelism. I am not sure that this is the correct way to achieve what I'm looking for
If the reason is not the usage of Sink.head
as I mentioned in the comment, you can backpressure the stream using Sink.actorRefWithBackpressure
.
Sample code:
class PersistActor extends Actor {
override def receive: Receive = {
case "init" =>
println("Initialized")
case "complete" =>
context.stop(self)
case message =>
//Persist Buzz??
sender() ! Done
}
}
val sink = Sink
.actorRefWithBackpressure(persistActor, "init", Done, "complete", PartialFunction.empty)
S3.download(bckt, bcktKey).map{
case Some((file, _)) =>
file
.via(CsvParsing.lineScanner())
.map(_.map(_.utf8String)).drop(1)//drop headers
.map(p => Foo(p.head, p(1)))
//You could backpressure here too...
.mapAsync(30) { p =>
implicit val askTimeout: Timeout = Timeout(10 seconds)
(httpClientActor ? p).mapTo[Buzz]
}
.to(sink)
.run()