scalathrottlingakka-fsm

How to make size based throttler using akka fsm?


I have a use case that i have to process the request using akka fsm as soon as number of request reaches to specified value.

sealed trait State
case object Idle extends State
case object Active extends State

sealed trait Data
case object Uninitialized extends Data
case object QuickStart extends Data
case class A(a: Int) extends Data

class RequestHandlers extends FSM[State, Data] {
  val queue = mutable.Queue[A]()
  startWith(Idle, Uninitialized)

  when(Idle) {
    case Event(_, Uninitialized) =>
      println("At Idle")
      //      self ! QuickStart
      goto(Active) using QuickStart
  }

  when(Active) {
    case Event(_, request: A) =>
      println("At Active")
      queue.take(2).map{x => println("request---  " + x.a  + "processing")
      queue.dequeue()

}

      Thread.sleep(2000L)
      goto(Active) using Uninitialized
  }


  whenUnhandled {
    case Event(update: A, QuickStart) =>
      queue += update
      if(queue.size >= 2) {
        println(s"At unhandled + ${update}" + "--" + queue)
        goto(Active) using update
      }
      else {
        println("size has not reached")
        goto(Active) using Uninitialized
      }
    case Event(update: A, Uninitialized) =>
      queue += update
      println(s"At unhandled - Uninitialised + $update")
      goto(Active) using QuickStart
  }

  initialize()

}

object demo extends App  {

  val actorSystem = ActorSystem("system")
  val actor = actorSystem.actorOf(Props(classOf[RequestHandlers]))

  val list = (1 to 10).toList
  list.foreach { abc =>

    actor ! Uninitialized
    actor ! A(abc)
    println("Sent")
  }

}

I tried to using mutable queue where i am adding my request. After size of queue reaches to certain value i.e 2 process those requests simultaneously. After processing, i am dequeuing it. If i send 10 request, it will process 8 request but for last 2 it will never go to active state. I am not getting where i am making mistake while transition.

Any help will be appreciated!


Solution

  • I think that the minimal example of what you are doing looks like this:

    // The only type of incoming message
    case class Msg(a: Int)
    
    // States
    sealed trait State
    case object Waiting extends State
    case object Active extends State
    
    // StateData is shared between states
    case class StateData(queue: immutable.Queue[Msg])
    object StateData {
      val empty = StateData(immutable.Queue.empty)
    
      def single(msg: Msg) = StateData(immutable.Queue(msg))
    }
    
    
    class RequestHandlers extends FSM[State, StateData] {
      val startTime = System.currentTimeMillis()
    
      def curTime = {
        val time = (System.currentTimeMillis() - startTime) / 1000f
        f"[$time%3.2f]"
      }
    
      startWith(Waiting, StateData.empty)
    
      onTransition {
        case Waiting -> Active =>
          //use nextStateData rather than stateData !
          nextStateData match {
            case StateData(queue) =>
              queue.foreach(x => println(s"$curTime processing ${x.a} "))
              Thread.sleep(2000L)
          }
      }
    
      when(Active) {
        case Event(msg: Msg, _) =>
          println(s"$curTime at Active $msg")
          // we've just processed old data
          // drop the old queue and create a new one with the new message
          goto(Waiting) using StateData.single(msg)
      }
      when(Waiting) {
        case Event(msg: Msg, StateData(oldQueue)) =>
          // add an event to the queue and check if it is time to process
          val newQueue = oldQueue :+ msg
          println(s"$curTime at Idle $msg, newQueue = $newQueue")
          if (newQueue.size == 2) {
            goto(Active) using StateData(newQueue)
          }
          else {
            stay using StateData(newQueue)
          }
      }
    
      initialize()
    }
    

    and the test program is

    object demo extends App  {
    
        val actorSystem = ActorSystem("system")
        val actor = actorSystem.actorOf(Props(classOf[RequestHandlers]))
    
        (1 to 10).toList.foreach { i =>
          println(s"Send $i")
          actor ! Msg(i)
        }
    
    }
    

    The logic of RequestHandlers is that it accumulates incomming requests in a queue stored inside a StateData object (which has only one type that is shared between both states). There are two states Waiting and Active. The processing actually happens on the transition Waiting -> Active. Probably the most tricky point is to not forget that when FSM is in the Active state, new messages will arrive and should be processed by adding to a queue (or rather starting a new queue with the data from that message).

    P.S. Well, this example is probably not that minimal. In fact you could have just one state and do the processing inside if (newQueue.size == 2) but that would be a quite strange FSM.