scalafunctional-programmingscala-catsfs2

Skip errors in the infinite Stream


I have an infinite fs2.Stream which may encounter errors. I'd like to skip those errors with doing nothing (probably log) and keep streaming further elements. Example:

//An example
val stream = fs2.Stream
    .awakeEvery[IO](1.second)
    .evalMap(_ => IO.raiseError(new RuntimeException))

In this specific case, I'd like to get infinite fs2.Stream of Left(new RuntimeException) emitting every second.

There is a Stream.attempt method producing the stream that got terminated after the first error is encountered. Is there a way to just skip errors and keep pulling further elements?

The IO.raiseError(new RuntimeException).attempt won't work in general since it would require attempting all effects in all places of the stream pipeline composition.


Solution

  • There's no way to handle errors in the way you described. When stream encounters the first error it is terminated. Please check this gitter question.

    You can handle it in two ways:

    1. Attempt the effect (but you already mentioned it is not possible in your case).

    2. Restart stream after it is terminated:

    val stream: Stream[IO, Either[Throwable, Unit]] = Stream
        .awakeEvery[IO](1.second)
        .evalMap(_ => IO.raiseError(new RuntimeException))
        .handleErrorWith(t => Stream(Left(t)) ++ stream) //put Left to the stream and restart it
    
    //since stream will infinitely restart I take only 3 first values
    println(stream.take(3).compile.toList.unsafeRunSync())