i have a use case in which i have a actor hierarchy
parent -> childABC -> workerchild
Now the worker child works and send its result to its parent(childABC which is a child of parent) and that child actor(childABC) send the result back to parent actor I am using pipeTo
and getting dead letters here is my code
parent
actor:
final case object GetFinalValue
class MyActor extends Actor{
import context.dispatcher
import akka.pattern.pipe
val log = LoggerFactory.getLogger(this.getClass)
val myManageActor = context.actorOf(Props[ManagerMyActor],"Managemyactor")
implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)
override def receive: Receive = {
case GetFinalValue=>
ask(myManageActor,GetValue).pipeTo(sender())
case message =>
log.warn(" Unhandled message received : {}", message)
unhandled(message)
}
}
childABC
(acc to example I gave above)
final case object GetValue
class ManagerMyActor extends Actor{
import context.dispatcher
import akka.pattern.pipe
val log = LoggerFactory.getLogger(this.getClass)
val myTokenActor = context.actorOf(Props[TokenMyActor2],"toknMyActor2")
implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)
override def receive: Receive = {
case GetValue=>
ask(myTokenActor,CalculateValue).pipeTo(sender())
case message =>
log.warn(" Unhandled message received : {}", message)
unhandled(message)
}
}
child
actor:
final case object CalculateValue
class TokenMyActor2 extends Actor{
import context.dispatcher
import akka.pattern.pipe
val log = LoggerFactory.getLogger(this.getClass)
override def receive: Receive = {
case CalculateValue=>
val future = Future{ "get the string"
}
val bac = future.map{result =>
sender ! result
}//.pipeTo(sender())
case message =>
log.warn("Actor MyActor: Unhandled message received : {}", message)
unhandled(message)
}
}
def main(args: Array[String]): Unit = {
implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)
val myActor = system.actorOf(Props[MyActor],"myActor")
val future = ask(myActor, GetFinalValue).mapTo[String]
future.map {str =>
log.info ("string is {}",str)
}
Here are the logs:
[INFO] [akkaDeadLetter][01/12/2021 19:17:22.000] [api-akka.actor.default-dispatcher-5] [akka://api/deadLetters] Message [java.lang.String] from Actor[akka://api/user/myActor/Managemyactor/toknMyActor2#1239397461] to Actor[akka://api/deadLetters] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][01/12/2021 19:17:41.989] [api-akka.actor.default-dispatcher-7] [akka://api/deadLetters] Message [akka.actor.Status$Failure] from Actor[akka://api/user/myActor#1829301550] to Actor[akka://api/deadLetters] was not delivered. [2] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][01/12/2021 19:17:41.996] [api-akka.actor.default-dispatcher-7] [akka://api/deadLetters] Message [akka.actor.Status$Failure] from Actor[akka://api/user/myActor/Managemyactor#-269929265] to Actor[akka://api/deadLetters] was not delivered. [3] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
Please guide me where am I mistaken, or pipeTo
should not be used like this? if so what should i do to make it work
Not sure if it's intended or not but ask(myManageActor,GetValue).pipeTo(sender())
can be implemented as forward
.
class MyActor extends Actor {
lazy val myManageActor: ActorRef = ???
override def receive: Receive = {
case GetFinalValue =>
myManageActor.forward(GetValue)
}
}
forward
is the same as tell
but it preserves the original sender of the messages.
This can be applied to MyActor
and ManagerMyActor
.
In the case of TokenMyActor2
, you should not use
future.map{ result =>
sender ! result
}
as it it breaks akka context encapsulation, as specified in docs
When using future callbacks, such as onComplete, or map such as thenRun, or thenApply inside actors you need to carefully avoid closing over the containing actor’s reference, i.e. do not call methods or access mutable state on the enclosing actor from within the callback. This would break the actor encapsulation and may introduce synchronization bugs and race conditions because the callback will be scheduled concurrently to the enclosing actor. Unfortunately there is not yet a way to detect these illegal accesses at compile time. See also: Actors and shared mutable state
You should instead rely on Future(???).pipeTo(sender())
, which is safe to use with sender()
.
After applying these changes, the code does work as expected
case object GetFinalValue
case object GetValue
case object CalculateValue
class MyActor extends Actor {
private val myManageActor: ActorRef =
context.actorOf(Props[ManagerMyActor], "myManageActor")
override def receive: Receive = { case GetFinalValue =>
myManageActor.forward(GetValue)
}
}
class ManagerMyActor extends Actor {
private val myTokenActor =
context.actorOf(Props[TokenMyActor2], "toknMyActor2")
override def receive: Receive = { case GetValue =>
myTokenActor.forward(CalculateValue)
}
}
class TokenMyActor2 extends Actor {
import context.dispatcher
override def receive: Receive = { case CalculateValue =>
val future = Future { "get the string" }
future.pipeTo(sender())
}
}
implicit val timeout = Timeout(3, SECONDS)
implicit val system = ActorSystem("adasd")
import system.dispatcher
val myActor = system.actorOf(Props[MyActor], "myActor")
val future = ask(myActor, GetFinalValue).mapTo[String]
future.foreach { str =>
println(s"got $str")
}
Produces got get the string
.
As a final note, I'd advise not to use ask
pattern within actors. The basic functionality of ask
can be easily achieved with just tell
and forward
. Also the code is shorter and not overloaded with constant need of implicit val timeout