I have a use case that i have to process the request using akka fsm as soon as number of request reaches to specified value.
sealed trait State
case object Idle extends State
case object Active extends State
sealed trait Data
case object Uninitialized extends Data
case object QuickStart extends Data
case class A(a: Int) extends Data
class RequestHandlers extends FSM[State, Data] {
val queue = mutable.Queue[A]()
startWith(Idle, Uninitialized)
when(Idle) {
case Event(_, Uninitialized) =>
println("At Idle")
// self ! QuickStart
goto(Active) using QuickStart
}
when(Active) {
case Event(_, request: A) =>
println("At Active")
queue.take(2).map{x => println("request--- " + x.a + "processing")
queue.dequeue()
}
Thread.sleep(2000L)
goto(Active) using Uninitialized
}
whenUnhandled {
case Event(update: A, QuickStart) =>
queue += update
if(queue.size >= 2) {
println(s"At unhandled + ${update}" + "--" + queue)
goto(Active) using update
}
else {
println("size has not reached")
goto(Active) using Uninitialized
}
case Event(update: A, Uninitialized) =>
queue += update
println(s"At unhandled - Uninitialised + $update")
goto(Active) using QuickStart
}
initialize()
}
object demo extends App {
val actorSystem = ActorSystem("system")
val actor = actorSystem.actorOf(Props(classOf[RequestHandlers]))
val list = (1 to 10).toList
list.foreach { abc =>
actor ! Uninitialized
actor ! A(abc)
println("Sent")
}
}
I tried to using mutable queue where i am adding my request. After size of queue reaches to certain value i.e 2 process those requests simultaneously. After processing, i am dequeuing it. If i send 10 request, it will process 8 request but for last 2 it will never go to active state. I am not getting where i am making mistake while transition.
Any help will be appreciated!
I think that the minimal example of what you are doing looks like this:
// The only type of incoming message
case class Msg(a: Int)
// States
sealed trait State
case object Waiting extends State
case object Active extends State
// StateData is shared between states
case class StateData(queue: immutable.Queue[Msg])
object StateData {
val empty = StateData(immutable.Queue.empty)
def single(msg: Msg) = StateData(immutable.Queue(msg))
}
class RequestHandlers extends FSM[State, StateData] {
val startTime = System.currentTimeMillis()
def curTime = {
val time = (System.currentTimeMillis() - startTime) / 1000f
f"[$time%3.2f]"
}
startWith(Waiting, StateData.empty)
onTransition {
case Waiting -> Active =>
//use nextStateData rather than stateData !
nextStateData match {
case StateData(queue) =>
queue.foreach(x => println(s"$curTime processing ${x.a} "))
Thread.sleep(2000L)
}
}
when(Active) {
case Event(msg: Msg, _) =>
println(s"$curTime at Active $msg")
// we've just processed old data
// drop the old queue and create a new one with the new message
goto(Waiting) using StateData.single(msg)
}
when(Waiting) {
case Event(msg: Msg, StateData(oldQueue)) =>
// add an event to the queue and check if it is time to process
val newQueue = oldQueue :+ msg
println(s"$curTime at Idle $msg, newQueue = $newQueue")
if (newQueue.size == 2) {
goto(Active) using StateData(newQueue)
}
else {
stay using StateData(newQueue)
}
}
initialize()
}
and the test program is
object demo extends App {
val actorSystem = ActorSystem("system")
val actor = actorSystem.actorOf(Props(classOf[RequestHandlers]))
(1 to 10).toList.foreach { i =>
println(s"Send $i")
actor ! Msg(i)
}
}
The logic of RequestHandlers
is that it accumulates incomming requests in a queue stored inside a StateData
object (which has only one type that is shared between both states). There are two states Waiting
and Active
. The processing actually happens on the transition Waiting
-> Active
. Probably the most tricky point is to not forget that when FSM is in the Active
state, new messages will arrive and should be processed by adding to a queue (or rather starting a new queue with the data from that message).
P.S. Well, this example is probably not that minimal. In fact you could have just one state and do the processing inside if (newQueue.size == 2)
but that would be a quite strange FSM.