What is the proper way to handle Futures from inside an Akka (typed) Actor?
For example, assume there is an Actor OrderActor
that receives Commands to place orders... which it does by making an http call to an external service. Since these are http calls to an external service, Future
s are involved. So, what is the right way to handle that Future
from within the Actor.
I read something about the pipeTo pattern. Is that what needs to happen here or something else?
class OrderActor(context: ActorContext[OrderCommand], orderFacade: OrderFacade)
extends AbstractBehavior[OrderCommand](context) {
context.log.info("Order Actor started")
override def onMessage(msg: OrderCommand): Behavior[OrderCommand] = {
msg match {
case PlaceOrder(
referenceId: OrderReferenceId,
ticker: Ticker,
quantity: Int,
replyTo: ActorRef[OrderResult]
) =>
orderFacade
.placeOrder(ticker, quantity) //this returns a Future
.map(res => {
//transform result
//book keeping / notification (affects state)
replyTo ! transformed
//Can/Should we map like this? I tried adding a log statement in here, but I never see it... and the replyTo doesnt seem to get the message.
})
this
It's generally best to avoid doing Future
transformations (map
, flatMap
, foreach
, etc.) inside an actor. There's a distinct risk that some mutable state within the actor isn't what you expect it to be when the transformation runs. In Akka Classic, perhaps the most pernicious form of this would result in sending a reply to the wrong actor.
Akka Typed (especially in the functional API) reduces a lot of the mutable state which could cause trouble, but it's still generally a good idea to pipe the Future
as a message to the actor.
So if orderFacade.placeOrder
results in a Future[OrderResponse]
, you might add subclasses of OrderCommand
like this
// also include fields from the PlaceOrder which will be useful
case class OrderResponseIs(resp: OrderResponse, replyTo: ActorRef[OrderResult]) extends OrderCommand
// TODO include fields
case class OrderFailed() extends OrderCommand
And then pipe the Future
to yourself with:
import scala.util.{ Failure, Success }
context.pipeToSelf(orderFacade.placeOrder) {
case Success(resp) => OrderResponseIs(resp, replyTo)
case Failure(_) => OrderFailed()
}
You then have to handle those messages:
case OrderResponseIs(resp, replyTo) =>
// transform resp
val transformed = ???
replyTo ! transformed
this
case OrderFailed() =>
context.log.warning("Stuff is broken")
this
There's not actually much overhead in doing this versus map
and friends (both will typically involve scheduling a task for asynchronous execution on the dispatcher).