scalaclassakkaactorunbounded-wildcard

Pass a wildcard class as part of actor message with Akka actors?


I am trying to simplify the handling of messages to actors in my application. It has become quite large and messy, so I'm refactoring how I pass around actor references.

The plan is to:

This would mean I would only need to pass around a single actor reference in my application.

So far, I have this (it is unfinished):

package core

import akka.actor.{Actor, ActorRef, ActorSystem, PoisonPill}
import akka.pattern.ask
import akka.util.Timeout
import core.ActorManager.{AskTimeout, Forward, RegisterActor}

import scala.concurrent.Await

object ActorManager {
  case object Command
  case class Forward(actorName: String, command: Command)

  case class AskTimeout(actorName: String, command: Command, timeout: Timeout, classRef: Class[?], blocking: Boolean = false)

  case class RegisterActor(actorRef: ActorRef, name: String)
}

class ActorManager extends Actor {

  private val system: ActorSystem = context.system

  private var classicActors: Map[String, ActorRef] = Map.empty

  override def receive: Receive = {
    case RegisterActor(actorRef, name) =>
      if (classicActors.contains(name)) {
        system.log.warning(s"ActorSystem with name $name already registered. It will be overwritten.")
      }
      classicActors += name -> actorRef

    case ft@Forward(name, command) =>
      val classicActor = classicActors.get(name)
      if (classicActor.isDefined) {
        classicActor.get.forward(command)
      } else {
        system.log.error(s"Actor by the name of: $name is not registered. Command [${ft.toString}] was not processed.")
      }

    case at@AskTimeout(name, command, timeout, classRef, blocking) =>
      val classicActor = classicActors.get(name)

      implicit val actorTimeout: Timeout = timeout

      if (classicActor.isDefined) {
        if (blocking) {
          val result = Await.result((classicActor.get ? command).mapTo[classRef], timeout.duration)
        } else {

        }
      } else {
        system.log.error(s"Actor by the name of: $name is not registered. Command [${at.toString}] was not processed.")
      }

    case PoisonPill =>
      // shutdown all actors

  }

}

I seem to have a problem with the classRef field. The ask pattern needs to know what type of class to map the response into, which should be configurable at the point of creating the message to pass to this actor.

I tried:

  case class AskTimeout[T](actorName: String, command: Command, timeout: Timeout, blocking: Boolean = false)

...

  override def receive: Receive = {
    case at@AskTimeout[T](name, command, timeout, blocking) =>
      val classicActor = classicActors.get(name)

      implicit val actorTimeout: Timeout = timeout

      if (classicActor.isDefined) {
        if (blocking) {
          val result = Await.result((classicActor.get ? command).mapTo[T], timeout.duration)
        } else {

        }
      } else {
        system.log.error(s"Actor by the name of: $name is not registered. Command [${at.toString}] was not processed.")
      }

  }

}

But it seems I cannot pass the class reference this way either.

Is my approach completely wrong here or is there a correct way of achieving this?


Solution

  • I have found a solution.

    One of the issues was the use of Class[?] which I have replaced with Class[_]

    Now, when the actor receives a AskTimeout, it explicitly casts the result to the provided class type:

        case at@AskTimeout(name, command, timeout, classRef, blocking) =>
          val classicActor = classicActors.get(name)
    
          implicit val actorTimeout: Timeout = timeout
    
          if (classicActor.isDefined) {
            val future = classicActor.get ? command
            if (blocking) {
              val result = Await.result(future, timeout.duration)
              if (!classRef.isInstance(result)) {
                throw new ClassCastException(s"Response is not of type ${classRef.getName}")
              }
              val casted = classRef.cast(result)
              sender() ! casted
            } else {
              future.foreach { result =>
                if (classRef.isInstance(result)) {
                  val casted = classRef.cast(result)
                  sender() ! casted
                } else {
                  system.log.error(s"Received response not matching type ${classRef.getName}")
                }
              }(context.dispatcher)
            }
          } else {
            system.log.error(s"Actor by the name of: $name is not registered. Command [${at.toString}] was not processed.")
          }