scalamonixsttp

How can I close the STTP backend after completing my requests?


I am currently learning and playing around with STTP using the Monix backend. I am mainly stuck with closing the backend after all my requests (each request is a task) have been processed.

I have created sample/mock code to resemble my issue (to my understanding my problem is more general rather than specific to my code):

import sttp.client.asynchttpclient.monix._
import monix.eval.Task
import monix.reactive.Observable
import sttp.client.{Response, UriContext}

import scala.concurrent.duration.DurationInt

object ObservableTest extends App {

  val activities = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
    val ids: Task[List[Int]] = Task { (1 to 3).toList }
    val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
    val data: Task[List[Task[Response[Either[String, String]]]]] = ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))
    data.guarantee(backend.close()) // If I close the backend here, I can' generate requests after (when processing the actual requests in the list)
    // I have attempted to return a Task containing a tuple of (data, backend) but closing the backend from outside of the scope did not work as I expected
  }
  import monix.execution.Scheduler.Implicits.global
  val obs = Observable
    .fromTask(activities)
    .flatMap { listOfFetches =>
      Observable.fromIterable(listOfFetches)
    }
    .throttle(3 second, 1)
    .map(_.runToFuture)

  obs.subscribe()
}

And my fetch (api call maker) function looks like:

  def fetch(uri: Uri, auth: String)(implicit
      backend: SttpBackend[Task, Observable[ByteBuffer], WebSocketHandler]
  ) = {
    println(uri)
    val task = basicRequest
      .get(uri)
      .header("accept", "application/json")
      .header("Authorization", auth)
      .response(asString)
      .send()

    task
  }

As my main task contains other tasks, which I later need to process, I need to find an alternative way to close the Monix backend from outside. Is there a clean way to do close the backend after I consume the requests in List[Task[Response[Either[String, String]]]]?


Solution

  • The problems comes from the fact that with the sttp backend open, you are computing a list of tasks to be performed - the List[Task[Response[Either[String, String]]]], but you are not running them. Hence, we need to sequence running these tasks, before the backend closes.

    The key thing to do here is to create a single description of a task, that runs all of these requests while the backend is still open.

    Once you compute data (which itself is a task - a description of a computation - which, when run, yield a list of tasks - also descriptions of computations), we need to convert this into a single, non-nested Task. This can be done in a variety of ways (e.g. using simple sequencing), but in your case this will be using the Observable:

    AsyncHttpClientMonixBackend().flatMap { implicit backend =>
      val ids: Task[List[Int]] = Task { (1 to 3).toList }
      val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
      val data: Task[List[Task[Response[Either[String, String]]]]] =
        ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))
    
      val activities = Observable
        .fromTask(data)
        .flatMap { listOfFetches =>
          Observable.fromIterable(listOfFetches)
        }
        .throttle(3 second, 1)
        .mapEval(identity)
        .completedL
    
      activities.guarantee(
        backend.close()
      )
    }
    

    First note that the Observable.fromTask(...) is inside the outermost flatMap, so is created when the backend is still open. We create the observable, throttle it etc., and then comes the crucial fact: once we have the throttled stream, we evaluate each item (each item is a Task[...] - a description of how to send na http request) using mapEval. We get a stream of Either[String, String], which are the results of the requests.

    Finally, we convert the stream to a Task using .completedL (discarding the results), which waits until the whole stream completes.

    This final task is then sequenced with closing the backend. The final sequence of side-effects that will happen, as described above, is:

    1. open the backend
    2. create the list of tasks (data)
    3. create a stream, which throttles elements from the list computed by data
    4. evaluate each item in the stream (send requests)
    5. close the backend