I am currently learning and playing around with STTP using the Monix backend. I am mainly stuck with closing the backend after all my requests (each request is a task) have been processed.
I have created sample/mock code to resemble my issue (to my understanding my problem is more general rather than specific to my code):
import sttp.client.asynchttpclient.monix._
import monix.eval.Task
import monix.reactive.Observable
import sttp.client.{Response, UriContext}
import scala.concurrent.duration.DurationInt
object ObservableTest extends App {
val activities = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val ids: Task[List[Int]] = Task { (1 to 3).toList }
val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
val data: Task[List[Task[Response[Either[String, String]]]]] = ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))
data.guarantee(backend.close()) // If I close the backend here, I can' generate requests after (when processing the actual requests in the list)
// I have attempted to return a Task containing a tuple of (data, backend) but closing the backend from outside of the scope did not work as I expected
}
import monix.execution.Scheduler.Implicits.global
val obs = Observable
.fromTask(activities)
.flatMap { listOfFetches =>
Observable.fromIterable(listOfFetches)
}
.throttle(3 second, 1)
.map(_.runToFuture)
obs.subscribe()
}
And my fetch (api call maker) function looks like:
def fetch(uri: Uri, auth: String)(implicit
backend: SttpBackend[Task, Observable[ByteBuffer], WebSocketHandler]
) = {
println(uri)
val task = basicRequest
.get(uri)
.header("accept", "application/json")
.header("Authorization", auth)
.response(asString)
.send()
task
}
As my main task contains other tasks, which I later need to process, I need to find an alternative way to close the Monix backend from outside. Is there a clean way to do close the backend after I consume the requests in List[Task[Response[Either[String, String]]]]
?
The problems comes from the fact that with the sttp backend open, you are computing a list of tasks to be performed - the List[Task[Response[Either[String, String]]]]
, but you are not running them. Hence, we need to sequence running these tasks, before the backend closes.
The key thing to do here is to create a single description of a task, that runs all of these requests while the backend is still open.
Once you compute data
(which itself is a task - a description of a computation - which, when run, yield a list of tasks - also descriptions of computations), we need to convert this into a single, non-nested Task
. This can be done in a variety of ways (e.g. using simple sequencing), but in your case this will be using the Observable
:
AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val ids: Task[List[Int]] = Task { (1 to 3).toList }
val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
val data: Task[List[Task[Response[Either[String, String]]]]] =
ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))
val activities = Observable
.fromTask(data)
.flatMap { listOfFetches =>
Observable.fromIterable(listOfFetches)
}
.throttle(3 second, 1)
.mapEval(identity)
.completedL
activities.guarantee(
backend.close()
)
}
First note that the Observable.fromTask(...)
is inside the outermost flatMap
, so is created when the backend is still open. We create the observable, throttle it etc., and then comes the crucial fact: once we have the throttled stream, we evaluate each item (each item is a Task[...]
- a description of how to send na http request) using mapEval
. We get a stream of Either[String, String]
, which are the results of the requests.
Finally, we convert the stream to a Task
using .completedL
(discarding the results), which waits until the whole stream completes.
This final task is then sequenced with closing the backend. The final sequence of side-effects that will happen, as described above, is:
data
)data