scalaakkacommand-line-interfaceakka-remote-actorakka-remoting

How can I send messages to a remote actor via CLI with Akka remoting?


I have a remote actor, Bar and a local actor, Foo. I want to use Foo to pass messages to Bar on each invocation of a CLI.

Bar can be passed messages successfully, but Foo hangs while waiting for a message. To fix this, I added a sys.exit(0) at the end of Foo's main. This causes an association issue with Foo's system.

How can I shut down my local actor between successive CLI issuances without killing my local actor manually?

Shut up and give me the code!


Foo:

build.sbt

name := "Foo"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.11"
libraryDependencies += "com.typesafe.akka" %% "akka-remote" % "2.4.11"
libraryDependencies += "com.github.scopt" %% "scopt" % "3.5.0"

fork in run := true

Main.scala

import akka.actor._
import com.typesafe.config.ConfigFactory

case class Config(mode: String = "", greeting: String="")

class Foo extends Actor {
  // create the remote actor
  val BarActor = context.actorSelection("akka.tcp://BarSystem@127.0.0.1:2552/user/BarActor")

  def receive = {
    case method: String => BarActor ! method
  }
}

object CommandLineInterface {

  val config = ConfigFactory.load()
  val system = ActorSystem("FooSystem", config.getConfig("FooApp"))

  val FooActor = system.actorOf(Props[Foo], name = "FooActor")

  val parser = new scopt.OptionParser[Config]("Foo") {
    head("foo", "1.x")

    help("help").text("prints usage text")

    opt[String]('m', "method").action( (x, c) =>
      c.copy(greeting = x) ).text("Bar will greet with <method>")
  }
}

object Main extends App {
  import CommandLineInterface.{parser, FooActor}

  parser.parse(args, Config()) match {
    case Some(config) => FooActor ! config.greeting
    case None => sys.error("Bad news...")
  }
  /* 
    When sys.exit(0) commented, this hangs and Bar greet.
    When sys.exit(0) uncommented, this doesn't hang, but also Bar doesn't greet.
   */
 
  //sys.exit(0)
}

application.conf

FooApp {
  akka {
    loglevel = "INFO"
    actor {
      provider = "akka.remote.RemoteActorRefProvider"
    }
    remote {
      enabled-transports = ["akka.remote.netty.tcp"]
      netty.tcp {
        hostname = "127.0.0.1"
        port = 0
      }
      log-sent-messages = on
      log-received-messages = on
    }
  }
}

Bar:

build.sbt

name := "Bar"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.11"
libraryDependencies += "com.typesafe.akka" %% "akka-remote" % "2.4.11"

Main.scala

import akka.actor._
import com.typesafe.config.ConfigFactory

class Bar extends Actor {
  def receive = {
    case greeting: String => Bar.greet(greeting)
  }
}

object Bar {
  val config = ConfigFactory.load()
  val system = ActorSystem("BarSystem", config.getConfig("BarApp"))
  val BarActor = system.actorOf(Props[Bar], name = "BarActor")

  def greet(greeting: String) = println(greeting)

  def main(args: Array[String]): Unit = {
    /* Intentionally empty */
  }
}

application.conf

BarApp {
  akka {
    loglevel = "INFO"
    actor {
      provider = remote
    }
    remote {
      enabled-transports = ["akka.remote.netty.tcp"]
      netty.tcp {
        hostname = "127.0.0.1"
        port = 2552
      }
      log-sent-messages = on
      log-received-messages = on
    }
  }
}

Run Foo with sbt 'run-main Main -m hello', and run Bar with sbt 'run-main Main'.

Sorry for the long code, but it's the MVCE for my problem.

How can I achieve my desired behavior -- the CLI actor dies between successive CLI invocations with the remote actor waiting for new messages.


Solution

  • This is happening because you call sys.exit(0) immediately after sending a message to FooActor, so there's a significant chance that the application exits before FooActor gets the chance to even read the message, let alone forward it to BarActor.

    There seem to be many possible solutions, one of them being:

    class Foo extends Actor {
      // create the remote actor
      val BarActor = context.actorSelection("akka.tcp://BarSystem@127.0.0.1:2552/user/BarActor")
    
      override def receive = {
        case method: String => {
          BarActor ! method
          self ! PoisonPill
        }
      }
    
      override def postStop = {
        context.system.terminate
      }
    }
    

    Unfortunately, it turns out that the system still gets shut down before dispatching the message to Bar.

    I couldn't find any reasonable solution to this issue if you want to send a message in a "fire and forget" style. However, in most cases, it's desirable to get some kind of response from the remote actor, so you could do:

    class Foo extends Actor {
      // create the remote actor
      val BarActor = context.actorSelection("akka.tcp://BarSystem@127.0.0.1:2552/user/BarActor")
    
      override def receive = {
        case method: String => {
          BarActor ! method
          context.become(waitingToKillMyself)
        }
      }
    
      def waitingToKillMyself: Receive = {
        case response: String => {
          println(response)
          self ! PoisonPill
        }
      }
    
      override def postStop = {
        context.system.terminate
      }
    }
    
    // ...
    
    object Main extends App {
      import CommandLineInterface.{parser, FooActor, system}
      import system.dispatcher
    
      parser.parse(args, Config()) match {
        case Some(config) => {
          FooActor ! config.greeting
          system.scheduler.scheduleOnce(10.seconds, FooActor, PoisonPill)
        }
    
        case None => sys.error("Bad news...")
      }
    }
    

    Bar:

    class Bar extends Actor {
      def receive = {
        case greeting: String => {
          Bar.greet(greeting)
          sender() ! "OK"
        }
      }
    }