scalareactive-programmingmonix

How can I throttle sending HTTP get requests via Monix?


Build on my earlier question, and with insights from Artem, my objective is to send get requests to a given url, and use Monix's throttling feature to space out the requests (to avoid hitting rate limits).

The expected workflow looks something like:

make 1 (or more) api call(s) -> apply back-pressure/pausing (based on throttle) -> make the next request -> so on and so forth..

This is what I have tried so far (below is a simplified snippet of my actual code):

import sttp.client.asynchttpclient.monix._
import monix.eval.Task
import monix.reactive.Observable
import sttp.client.{Response, UriContext}
import scala.concurrent.duration.DurationInt

object ObservableTest extends App {

  val activities = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
    val ids: Task[List[Int]] = Task { (1 to 3).toList }
    val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
    val data: Task[List[Task[Response[Either[String, String]]]]] = ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))
    data.guarantee(backend.close())
  }

  import monix.execution.Scheduler.Implicits.global

  val flat: Unit = activities.runToFuture.foreach { x =>
    val r: List[Task[Response[Either[String, String]]]] = x // List with size 3
    Observable
      .fromIterable(r)
      .throttle(6 second, 1)
      .map(_.runToFuture)
      .subscribe()
  }
  while (true) {}
}
  

And this is how the function for fetching the data looks like:

  def fetch(uri: Uri, auth: String)(implicit
      backend: SttpBackend[Task, Observable[ByteBuffer], WebSocketHandler]
  ) = {
    println(uri)
    val task = basicRequest
      .get(uri)
      .header("accept", "application/json")
      .header("Authorization", auth)
      .response(asString)
      .send()

    task
  }

I have tried running the aforementioned code and I still see that all the get requests are fired without any spacing in between.

For illustration, my current api call logs look something like:

//(https://mr.foos.api/v1), Sat Aug 08 18:47:15 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:47:15 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:47:15 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:47:15 CEST 2020)

And I am trying to achieve something similar to:

//(https://mr.foos.api/v1), Sat Aug 08 18:50:15 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:50:18 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:50:21 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:50:24 CEST 2020)

Update:


Solution

  • So if i have understood right you have types like this:

    object ObservableTest extends App  {
      type Response = Either[ResponseError[Error], Activities]
      case class Activities()
      val activities: Task[List[Response]] = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
        def fetchData(uri: String): Task[Response] = ???
        val ids: Task[List[Int]] = ??? // a Task containing a List of IDs (from a previous step)
        val func: String => Task[Response] = (i: String) => fetchData("someUri") // a function that will be used to call an endpoint
        val data: Task[List[Task[Response]]] = ids map (_ map (id => func(id.toString))) // Maps API calling-function to the ids
        val activitiesData: Task[List[Response]] = data.flatMap(Task.parSequenceUnordered(_)) // Flattenned the previous step
        activitiesData.guarantee(backend.close())
      }
      import monix.execution.Scheduler.Implicits.global
      Observable(activities)
        .throttle(3 second, 1)
        .subscribe()
    }
    

    The problem in your code that you throttle the one big Task that contains multiple actions, some of them even parallel (but that not is the root of the problem). Even in types, you can see that - you should make observable from a list of tasks (each of them would be throttled), not the task of the list.

    I actually don't know where ids come from and it can be the cornerstone of the evaluation pipeline. But if we have the task with them like in the example. We will do this.

    import monix.eval.Task
    import sttp.client.asynchttpclient.monix._
    import monix.eval.Task._
    import monix.reactive.Observable
    import sttp.client.ResponseError
    
    import scala.concurrent.duration.DurationInt
    
    object ObservableTest extends App  {
      type Response = Either[ResponseError[Error], Activity]
      case class Activity()
      val activities: Task[List[Task[Response]]] = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
        def fetchData(uri: String): Task[Response] = Task {
          println("mocked http request")
          Right(Activity())
        }
        val ids: Task[List[Int]] = Task { (1 to 100).toList} // a Task containing a List of IDs (from a previous step)
        val func: Int => Task[Response] = (i: Int) => fetchData(s"someUri_$i") // a function that will be used to call an endpoint
        val data: Task[List[Task[Response]]] = ids.map(_.map(func)) // Maps API calling-function to the ids
        data.guarantee(backend.close())
      }
      import monix.execution.Scheduler.Implicits.global
    
      Observable.fromTask(activities)
        .flatMap { listOfFetches: List[Task[Response]]  =>
          Observable.fromIterable(listOfFetches)
        }
        .throttle(3.second, 1)
        .map(_.runToFuture) 
        .subscribe()
      
      while(true) {}
    }
    

    We throttle a list of fetches, not the task that does all fetches inside.

    PS: Please ask questions what is unclear, I will add comments to the code