For a University assignment, I have to implement a simulation of the Raft protocol in Akka (I am using Akka typed, using Behaviors).
In the Raft protocol, interactions between actors have a 1:1 mapping between a request and a response; responses must be delivered in a timely manner.
Therefore, it makes sense to use the ask
pattern as demonstrated by the documentation in the Request-Response with ask between two actors example.
In my implementation, requests and responses must be context-aware: this means that, when an actor that performed a query receives a response, it must know what query the response was for. The example in the documentation suggests to include a query ID in the message.
What I need to solve can be described with the following example:
I think that, to "filter" replies that do not have a correct query ID, I can put a BehaviorInterceptor in actor A that checks that the ID in the reply matches the expected query ID.
To summarize:
Moreover, I don't understand whether ask
is blocking or not.
Ideally, I would like to use ask
in a non-blocking way: actor A ask
s actor B, and, while waiting for B's reply, A can do other operations.
While waiting for B's reply, actor A can also change its behavior if needed (also a Behavior that does not handle B's replies).
Thank you for any insight!
An ask between two actors (using the ActorContext
) is non-blocking.
Since the high watermark of the requests to a given target is an important part of protocol state for the actor, I would just store it in the asking actor's state (e.g. in Scala a Map[ActorRef[Request], Int]
). The adapted response contains the target and the id it's in response to (you define how this is incorporated when performing the ask); when receiving the adapted response, the first thing is comparing the id in the response to the high watermark for the target.
In Scala, for example:
sealed trait RequestA
case class QueryB(target: ActorRef[RequestB]) extends RequestA
case class ResponseFromB(target: ActorRef[RequestB], id: Int, resp: ResponseB) extends RequestA
case class BTimedOut(target: ActorRef[RequestB], id: Int) extends RequestA
sealed trait RequestB
def buildRequestB(id: Int)(replyTo: ResponseB): RequestB = ???
sealed trait ResponseB
def aBehavior(highWater: Map[ActorRef[RequestB], Int]): Behavior[RequestA] =
Behaviors.receive { (context, msg) =>
case QueryB(target) =>
implicit val timeout: Timeout = 10.seconds
val nextHighwater = highWater.get(target).map(_ + 1).getOrElse(0)
// request is sent and received "in the background"
context.ask(target, buildRequestB(nextHighwater)) {
case Success(resp) => ResponseFromB(target, nextHighwater, resp)
case Failure(_) => BTimedOut(target, nextHighwater)
}
aBehavior(highWater + (target -> nextHighwater))
case ResponseFromB(target, id, resp) =>
if (highWater.get(target).contains(id)) {
context.log.info("Accepting response: {}", resp)
Behaviors.same
} else {
context.log.info("Ignoring response: {}", resp)
Behaviors.same
}
case BTimedOut(target, id) =>
context.log.warning("Ask of {} (sequence ID {}) timed out", target, id)
Behaviors.same
}