scalaakka-stream

How to create akka source that is materialized to ActorRef in which the incoming messages know the sender


  val ref = Source.actorRef[String](
    completionMatcher =  PartialFunction.empty,
    failureMatcher = PartialFunction.empty,
    100000,
    OverflowStrategy.dropNew
  ).to(Sink.foreachAsync(1){ elem =>
    // how to reply to sender
    Future.successful()
  })

Above is example that does nearly what I need, with the exception that the underlying message does not know the sender. So it's impossible to reply. Is there a way or pattern which would allow me to reply to the sender, so that it can be used with ask pattern like:

  import akka.pattern.ask
  (ref ? "request").onComplete {
    case Failure(exception) => logger.error(s"Couldn't receive response", exception)
    case Success(value) => logger.info(s"Received response ${value}")
  }

Solution

  • This isn't possible with the Classic actor-based API and Source.actorRef, which as you note discards the sender.

    However, if using Akka 2.6+, one could use the Typed ask pattern in conjunction with akka.actor.typed.scaladsl.adapter.ClassicActorRefOps to include the sending actor (in this case the synthetic actor for the ask) in the message being sent to Source.actorRef.

    For your example, you would rewrite the stream as something like:

    import akka.actor.typed.{ ActorRef => TypedActorRef }
    
    val ref = Source.actorRef[(String, TypedActorRef[Any])](
      completionMatcher =  PartialFunction.empty,
      failureMatcher = PartialFunction.empty,
      100000,
      OverflowStrategy.dropNew
    ).to(Sink.foreach { // Sink.foreachAsync(1) isn't doing anything here...
      case (elem, sender) =>
        // do stuff with elem?
        sender ! "response"
    }
    

    And then to ask

    import akka.actor.typed.scaladsl.AskPattern._
    import akka.actor.typed.scaladsl.Scheduler
    import akka.actor.typed.scaladsl.adapter.{ ClassicActorRefOps, ClassicActorSystemOps }
    
    // actorSystem is the classic ActorSystem in use
    implicit val scheduler: Scheduler = actorSystem.toTyped.scheduler
    
    ref.toTyped.ask[Any]("request" -> _).onComplete {
      case Failure(exception) => logger.error(s"Couldn't receive response", exception)
      case Success(value) => logger.info(s"Received response ${value}")
    }