scalaasynchronousmonix

Error Handling on Monix parallel tasks (using parMap)


I am trying to use monix to do parallelize certain operations and then perform error handling

Lets say I am trying to parse and validate a couple of objects like this

def parseAndValidateX(x: X) Task[X]

and

def parseAndValidateY(y: Y): Task[Y]

Here X and Y are some types I have defined.

Now each of these methods evaluates some criteria and returns a Task. If the evaluation fails, I have some code of the form

Task.raiseError(new CustomException("X is invalid because certain reasons a,b,c"))

I have a similar task raise for Y.

Task.raiseError(new CustomException("Y is invalid because certain reasons a',b',c'"))

Now I have this type

case class Combined(x: X, y: Y)

and I defined this

private def parseAndValidateCombined(x: X, y: Y): Task[Combined] = {
  val xTask = parseAndValidateX(x)
  val yTask = parseAndValidateY(y)
     
  Task.parMap2(xTask, yTask) {
    (xEval, yEval) => SecretData(xEval, yTask)
  }
}

This should allow me to run the validation in parallel, and sure enough I get the response.

However I also want this behavior

In the event both tasks fail, I want to return an error like this

Task.raiseError(new CustomException("X is invalid because certain reasons a,b,c and "Y is invalid because certain reasons a',b',c'"))

I cannot seem to do this. Depending on which of the two tasks fail, I can only be able to get one of the two failures on the onRecover method of the parMap2 output.

In case both do fail, I only get the error for task X.

Is it possible for me to accomplish what I am doing with Monix in a fully async way (for instance maybe some other method to compose tasks together)? Or would I have to block the exec, get the errors individually and recompose the values?


Solution

  • With parMap2 only, it is not possible to accomplish what you want to do. The documentation says:

    In case one of the tasks fails, then all other tasks get cancelled and the final result will be a failure.

    However, it is possible to accomplish want you to want exposing errors, and not hiding behind the monadic handling. This is possible via materialize method.

    So, for instance, you can implement your method as:

    private def parseAndValidateCombined[X, Y](x: X, y: Y): Task[Combined] = {
      val xTask = parseAndValidate(x).materialize // turn on Task[Try[X]]
      val yTask = parseAndValidate(y).materialize // turn on Task[Try[Y]]
         
      Task.parMap2(xTask, yTask) {
        case (Success(xEval), Success(yEval)) => Success(SecretData(xEval, yEval))
        case (Failure(_), Failure(_)) => Failure[Combined](new CustomException(....))
        case (Failure(left), _) => Failure[Combined](left)
        case (_, Failure(right)) => Failure[Combined](right)
      }.dematerialize // turn to Task[Combined]
    }