scalaakkaakka-actor

getting akka dead letters when using pipeTo between two actors


i have a use case in which i have a actor hierarchy

parent -> childABC -> workerchild

Now the worker child works and send its result to its parent(childABC which is a child of parent) and that child actor(childABC) send the result back to parent actor I am using pipeTo and getting dead letters here is my code

parent actor:

final case object GetFinalValue

class MyActor extends Actor{
  import context.dispatcher
  import akka.pattern.pipe
  val log = LoggerFactory.getLogger(this.getClass)
  val myManageActor = context.actorOf(Props[ManagerMyActor],"Managemyactor")
  implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)

  override def receive: Receive = {
    case GetFinalValue=>
      ask(myManageActor,GetValue).pipeTo(sender())

    case message =>
      log.warn(" Unhandled message received : {}", message)
      unhandled(message)
  }

}

childABC (acc to example I gave above)

final case object GetValue

class ManagerMyActor extends Actor{
  import context.dispatcher
  import akka.pattern.pipe
  val log = LoggerFactory.getLogger(this.getClass)
  val myTokenActor = context.actorOf(Props[TokenMyActor2],"toknMyActor2")
  implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)

  override def receive: Receive = {
    case GetValue=>
      ask(myTokenActor,CalculateValue).pipeTo(sender())
 
    case message =>
      log.warn(" Unhandled message received : {}", message)
      unhandled(message)
  }

}

child actor:

final case object CalculateValue

class TokenMyActor2 extends Actor{
  import context.dispatcher
  import akka.pattern.pipe
  val log = LoggerFactory.getLogger(this.getClass)

  override def receive: Receive = {
    case CalculateValue=>
      val future = Future{ "get the string"
      }
      val bac = future.map{result =>
          sender ! result
      }//.pipeTo(sender())


    case message =>
      log.warn("Actor MyActor: Unhandled message received : {}", message)
      unhandled(message)
  }

}


def main(args: Array[String]): Unit = {
    implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)

    val myActor = system.actorOf(Props[MyActor],"myActor")
    val future = ask(myActor, GetFinalValue).mapTo[String]
    future.map {str =>
      log.info ("string is {}",str)
    }

Here are the logs:

[INFO] [akkaDeadLetter][01/12/2021 19:17:22.000] [api-akka.actor.default-dispatcher-5] [akka://api/deadLetters] Message [java.lang.String] from Actor[akka://api/user/myActor/Managemyactor/toknMyActor2#1239397461] to Actor[akka://api/deadLetters] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][01/12/2021 19:17:41.989] [api-akka.actor.default-dispatcher-7] [akka://api/deadLetters] Message [akka.actor.Status$Failure] from Actor[akka://api/user/myActor#1829301550] to Actor[akka://api/deadLetters] was not delivered. [2] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][01/12/2021 19:17:41.996] [api-akka.actor.default-dispatcher-7] [akka://api/deadLetters] Message [akka.actor.Status$Failure] from Actor[akka://api/user/myActor/Managemyactor#-269929265] to Actor[akka://api/deadLetters] was not delivered. [3] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Please guide me where am I mistaken, or pipeTo should not be used like this? if so what should i do to make it work


Solution

  • Not sure if it's intended or not but ask(myManageActor,GetValue).pipeTo(sender()) can be implemented as forward.

    class MyActor extends Actor {
      lazy val myManageActor: ActorRef = ???
    
      override def receive: Receive = {
        case GetFinalValue =>
          myManageActor.forward(GetValue)
      }
    }
    

    forward is the same as tell but it preserves the original sender of the messages.

    This can be applied to MyActor and ManagerMyActor.

    In the case of TokenMyActor2, you should not use

    future.map{ result =>
              sender ! result
          }
    

    as it it breaks akka context encapsulation, as specified in docs

    When using future callbacks, such as onComplete, or map such as thenRun, or thenApply inside actors you need to carefully avoid closing over the containing actor’s reference, i.e. do not call methods or access mutable state on the enclosing actor from within the callback. This would break the actor encapsulation and may introduce synchronization bugs and race conditions because the callback will be scheduled concurrently to the enclosing actor. Unfortunately there is not yet a way to detect these illegal accesses at compile time. See also: Actors and shared mutable state

    You should instead rely on Future(???).pipeTo(sender()), which is safe to use with sender().

    After applying these changes, the code does work as expected

    case object GetFinalValue
    case object GetValue
    case object CalculateValue
    
    class MyActor extends Actor {
      private val myManageActor: ActorRef =
        context.actorOf(Props[ManagerMyActor], "myManageActor")
    
      override def receive: Receive = { case GetFinalValue =>
        myManageActor.forward(GetValue)
      }
    }
    
    class ManagerMyActor extends Actor {
      private val myTokenActor =
        context.actorOf(Props[TokenMyActor2], "toknMyActor2")
    
      override def receive: Receive = { case GetValue =>
        myTokenActor.forward(CalculateValue)
      }
    
    }
    
    class TokenMyActor2 extends Actor {
      import context.dispatcher
    
      override def receive: Receive = { case CalculateValue =>
        val future = Future { "get the string" }
        future.pipeTo(sender())
      }
    }
    
    implicit val timeout = Timeout(3, SECONDS)
    implicit val system = ActorSystem("adasd")
    import system.dispatcher
    val myActor = system.actorOf(Props[MyActor], "myActor")
    val future = ask(myActor, GetFinalValue).mapTo[String]
    future.foreach { str =>
      println(s"got $str")
    }
    
    

    Produces got get the string.

    As a final note, I'd advise not to use ask pattern within actors. The basic functionality of ask can be easily achieved with just tell and forward. Also the code is shorter and not overloaded with constant need of implicit val timeout