taskmonix

Handling Errors From Task in Monix


I have the following code snippet that does a simple HTTP look up for a set of URL's.

  def parse(filter: ParserFilter): Task[Seq[HttpBinaryResponse]] = {
    import scalaj.http._
    // Extracts all HTML body anchor elements into an Option
    val browser = new JsoupBrowser {
      override def requestSettings(conn: Connection): Connection =
        conn.timeout(2000)
    }
    val hrefs =
      browser.get(filter.url) >> elementList("a[href]") >?> attr("href")
    val batched = hrefs.distinct.flatten
      .filter(_.startsWith("http"))
      //.map(toTask)
      .map(href =>
        Task {
          HttpBinaryResponse.asHttpBinaryResponse(href, Http(href).asString)
        })
      .sliding(30, 30)
      .toSeq
      .map(chunk => Task.parSequence(chunk))

    Task.sequence(batched).map(_.flatten)
  }

I have a Play controller where I call this function and run the Task as below:

val batches = appBindings.httpService.parse(parserFilter)
batches.runToFuture.materialize.map {
  case Success(elems) =>
    Ok(Json.prettyPrint(Json.obj(
      "baseURL" -> s"${parserFilter.url}",
      "Total Elements" -> s"${elems.size}",
      "results" -> HttpBinaryResponse.asJson(elems)
    ))).enableCors
  case Failure(err) =>
    Ok(Json.obj("status" -> "error", "message" -> s"${err.getMessage}")).enableCors
}

For one of a URL which results in a SSLHandshakeException, I land in the Failure(err) case block, but rather I want to get the following:

  1. No matter what the error is, I would like to land in the Success block, where I already capture the error messages for any failed URL.

How do I tweak my Task implementation to do what I need? Any ideas? I tried the onErrorRecoverWith handler, but seems not to have any effect. Any ideas?


Solution

  • I managed to get this done as below:

    def parse(filter: ParserFilter): Task[Seq[HttpBinaryResponse]] = {
        import scalaj.http._
        // Extracts all HTML body anchor elements into an Option
        val browser = new JsoupBrowser {
          override def requestSettings(conn: Connection): Connection =
            conn.timeout(2000)
        }
        val hrefs =
          browser.get(filter.url) >> elementList("a[href]") >?> attr("href")
        val batched = hrefs.distinct.flatten
          .filter(_.startsWith("http"))
          //.map(toTask)
          .map(href =>
            Task {
              HttpBinaryResponse.asHttpBinaryResponse(href, Http(href).asString)
            }.onErrorRecoverWith { case ex: Exception =>
              Task.now(
                HttpBinaryResponse(
                  origin = href,
                  isSuccess = false,
                  errorMessage = Some(s"${ex.getMessage}")))
            })
          .sliding(30, 30)
          .toSeq
          .map(chunk => Task.parSequence(chunk))
    
        Task.sequence(batched).map(_.flatten)
      }