scalaakkatypesafe-stack

Applying custom action on SupervisorStrategy after maxNrOfRetries?


My Parent Actor looks like

case object StartRemoteProcessor

class ConnectorActor extends Actor with ActorLogging {
  override def supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 20 seconds) {
    case e: OutOfMemoryError =>
      log.error("Exception received => " + e.getMessage)
      Restart
    case e: IllegalArgumentException =>
      log.error("Exception received => " + e.getMessage)
      Restart
  }

  def receive = LoggingReceive {
    case StartRemoteProcessor =>
      val remoteProcessor = context.actorOf(Props[ProcessingActor], "processingActor")
      log.info("Starting Remote Processor")
      remoteProcessor ! "Start"
    case "ProcessingStopped" =>
      notifyFailure()
  }

  def notifyFailure() = {
    log.info("notifying failure to server")
  }
}

As per docs

The child actor is stopped if the limit is exceeded.

Requirement

On my Child Actor I have

class ProcessingActor extends Actor with ActorLogging {

  override def aroundPostRestart(reason: Throwable): Unit = self.tell("Start", context.parent)
  override def preStart(): Unit = ()
  override def postStop(): Unit = context.parent ! "ProcessingStopped"

  def receive = LoggingReceive {
    case "Start" =>
      log.info("ProcessingActor path => " + self.path)
      startProcessing()
  }

  def startProcessing() = {
    println("executing startProcessing")
    throw new IllegalArgumentException("not implemented by choice")
  }
}

But in logs, I see that notifyFailure is called on every Restart

[INFO] [07/24/2015 11:57:50.107] [main] [Remoting] Starting remoting
[INFO] [07/24/2015 11:57:50.265] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://ConnectorSystem@127.0.0.1:2554]
[INFO] [07/24/2015 11:57:50.266] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://ConnectorSystem@127.0.0.1:2554]
ConnectorSystem Started
[INFO] [07/24/2015 11:57:50.277] [ConnectorSystem-akka.actor.default-dispatcher-3] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] Starting Remote Processor
[ERROR] [07/24/2015 11:57:50.509] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] Exception received => not implemented by choice
[ERROR] [07/24/2015 11:57:50.511] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ProcessingSystem@127.0.0.1:2552/remote/akka.tcp/ConnectorSystem@127.0.0.1:2554/user/connectorActor/processingActor] not implemented by choice
java.lang.IllegalArgumentException: not implemented by choice
    at com.learn.remote.processing.ProcessingActor.startProcessing(ProcessingActor.scala:23)
    at com.learn.remote.processing.ProcessingActor$$anonfun$receive$1.applyOrElse(ProcessingActor.scala:18)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
    at com.learn.remote.processing.ProcessingActor.aroundReceive(ProcessingActor.scala:7)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[INFO] [07/24/2015 11:57:50.526] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] notifying failure to server
[ERROR] [07/24/2015 11:57:50.528] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] Exception received => not implemented by choice
[ERROR] [07/24/2015 11:57:50.528] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ProcessingSystem@127.0.0.1:2552/remote/akka.tcp/ConnectorSystem@127.0.0.1:2554/user/connectorActor/processingActor] not implemented by choice
java.lang.IllegalArgumentException: not implemented by choice
    at com.learn.remote.processing.ProcessingActor.startProcessing(ProcessingActor.scala:23)
    at com.learn.remote.processing.ProcessingActor$$anonfun$receive$1.applyOrElse(ProcessingActor.scala:18)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
    at com.learn.remote.processing.ProcessingActor.aroundReceive(ProcessingActor.scala:7)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[INFO] [07/24/2015 11:57:50.533] [ConnectorSystem-akka.actor.default-dispatcher-3] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] notifying failure to server
[ERROR] [07/24/2015 11:57:50.534] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] Exception received => not implemented by choice
[ERROR] [07/24/2015 11:57:50.534] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ProcessingSystem@127.0.0.1:2552/remote/akka.tcp/ConnectorSystem@127.0.0.1:2554/user/connectorActor/processingActor] not implemented by choice
java.lang.IllegalArgumentException: not implemented by choice
    at com.learn.remote.processing.ProcessingActor.startProcessing(ProcessingActor.scala:23)
    at com.learn.remote.processing.ProcessingActor$$anonfun$receive$1.applyOrElse(ProcessingActor.scala:18)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
    at com.learn.remote.processing.ProcessingActor.aroundReceive(ProcessingActor.scala:7)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[INFO] [07/24/2015 11:57:50.538] [ConnectorSystem-akka.actor.default-dispatcher-3] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] notifying failure to server
[ERROR] [07/24/2015 11:57:50.540] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] Exception received => not implemented by choice
[ERROR] [07/24/2015 11:57:50.540] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ProcessingSystem@127.0.0.1:2552/remote/akka.tcp/ConnectorSystem@127.0.0.1:2554/user/connectorActor/processingActor] not implemented by choice
java.lang.IllegalArgumentException: not implemented by choice
    at com.learn.remote.processing.ProcessingActor.startProcessing(ProcessingActor.scala:23)
    at com.learn.remote.processing.ProcessingActor$$anonfun$receive$1.applyOrElse(ProcessingActor.scala:18)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
    at com.learn.remote.processing.ProcessingActor.aroundReceive(ProcessingActor.scala:7)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[INFO] [07/24/2015 11:57:50.545] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] notifying failure to server

How can I achieve this behavior?


Solution

  • The answer from daydreamer will work, but an alternative approach could be to watch the child actor from the parent, and when you receive the terminated message, execute notifyFailure

    var remoteProcessor:ActorRef = _
    
    def receive = LoggingReceive {
      case StartRemoteProcessor =>
        remoteProcessor = context.actorOf(Props[ProcessingActor], "processingActor")
        context.watch(remoteProcessor)
        log.info("Starting Remote Processor")
        remoteProcessor ! "Start"
    
      case Terminated(remoteProcessor) =>
        notifyFailure()
    }
    

    This way you don't need to customize the Actor lifecycle methods, which I find can be a rich source of bugs.