scalaakka-streamalpakka

How to handle backpressure when Streaming file from s3 with actor interop


I am trying to download a large file from S3 and sending it's data to another actor that is doing an http request and then to persist the response. I want to limit number of requests sent by that actor hence I need to handle backpressure.
I tried doing something like this :

 S3.download(bckt, bcktKey).map{
      case Some((file, _)) =>
        file
          .via(CsvParsing.lineScanner())
          .map(_.map(_.utf8String)).drop(1)//drop headers
          .map(p => Foo(p.head, p(1)))
          .mapAsync(30) { p =>
            implicit val askTimeout: Timeout = Timeout(10 seconds)
            (httpClientActor ? p).mapTo[Buzz]
          }
          .mapAsync(1){
          case b@Buzz(_, _) =>
            (persistActor ? b).mapTo[Done]
        }.runWith(Sink.head)

The problem is that I see that it reads only 30 lines from file as the limit set for parallelism. I am not sure that this is the correct way to achieve what I'm looking for


Solution

  • If the reason is not the usage of Sink.head as I mentioned in the comment, you can backpressure the stream using Sink.actorRefWithBackpressure.

    Sample code:

    class PersistActor extends Actor {
    
        override def receive: Receive = {
          case "init" =>
            println("Initialized")
          case "complete" =>
            context.stop(self)
          case message =>
            //Persist Buzz??
            sender() ! Done
        }
     }
    
     val sink = Sink
       .actorRefWithBackpressure(persistActor, "init", Done, "complete", PartialFunction.empty)
    
    S3.download(bckt, bcktKey).map{
      case Some((file, _)) =>
        file
          .via(CsvParsing.lineScanner())
          .map(_.map(_.utf8String)).drop(1)//drop headers
          .map(p => Foo(p.head, p(1)))
          //You could backpressure here too...
          .mapAsync(30) { p =>
            implicit val askTimeout: Timeout = Timeout(10 seconds)
            (httpClientActor ? p).mapTo[Buzz]
          }
          .to(sink)
          .run()