How can I handle the failure during the asynchronous execution of the task? I.e. at least print the stack trace and shut down. The code below seems to wait forever for input > 5
val things = Range(1, 40)
implicit val scheduler = monix.execution.Scheduler.global
def t(i:Int) = Task.eval {
Try{
Thread.sleep(1000)
val result = i + 1
if(result > 5){
throw new Exception("asdf")
}
// i.e. write to file, that's why unit is returned
println(result) // Effect
"Result"
}
}
val futures = things.map(e=> t(e))
futures.foreach(_.runToFuture)
trying:
futures.foreach(_.runToFuture.onComplete {
case Success(value) =>
println(value)
case Failure(ex) =>
System.err.println(ex)
System.exit(1)
})
will not stop the computation. How can I log the stack trace and cancel the ongoing computations and stop?
A more idiomatic approach would be to use Observable
instead of Task
since it is dealing with list of data (I'm assuming that's the use case since it is shown in the question).
val obs = Observable
.fromIterable(Range(1, 40))
.mapEval(i =>
if (i + 1 > 5) Task.raiseError(new Exception("Error")) // will stop the stream
else Task.delay(println(i)) // Or write to file in your case
)
.completedL
.runToFuture
obs
.recover {
case NonFatal(e) => println("Error")
}
Alternatively, you can also signal the error with Either
which leads to better type safety, since you'll need to handle the Either
result.
val obs = Observable
.fromIterable(Range(1, 40))
.mapEval(i =>
if (i + 1 > 5) Task.pure(Left("Error"))
else Task.delay(println(i)).map(_ => Right(())) // Or write to file in your case
)
.takeWhileInclusive(_.isRight) // will also emit the failing result
.lastL
.runToFuture
obs.map {
case Left(err) => println("There's an error")
case _ => println("Completed successfully")
}