scalascala-catscats-effectfs2

Handling Exceptions in Scala FS2 Stream Transformation flow


import cats.effect.{IO, IOApp}
import fs2.Pipe
import fs2.Stream

object Test extends IOApp.Simple {

  final case class Student(id: Int, name: String)

  private val studentData: Map[Int, Student] = Map(1 -> Student(1, "Olivia"), 2 -> Student(2, "Liam"))

  override def run: IO[Unit] = {

    def firstTransformation: Pipe[IO, Int, Student] = {
      stream =>
        stream.evalMap(id => IO(studentData(id))) // Here database or an external API call is performed.
    }

    def secondTransformation: Pipe[IO, Student, String] = {
      stream =>
        stream.evalMap(student => IO(student.name)) // Here database or an external API call is performed.
    }

    val sourceStream: Stream[IO, Int] = Stream(1, 2, 3).repeat

    sourceStream
      .through(firstTransformation)
      .through(secondTransformation)
      .map(name => println(s"name $name"))
      .compile
      .drain
  }
}

How can exceptions in FS2 stream transformations be handled to ensure uninterrupted flow even if an exception occurs midway through a transformation? In the current setup the stream should indefinitely print only "Olivia" and "Liam" despite any exceptions.

It should output: name Olivia name Liam name Olivia name Liam name Olivia name Liam ......... ......... infinitely

How can we design our stream flow and manage exceptions to ensure that it doesn't halt the FS2 flow? Thank you in advance.


Solution

  • Short answer: add proper handling for errors in your transformation steps.

    Also: I agree with @Daenyth about how to represent your transformation functions. You should generally follow the principle of least power; if your transformation is just a simple function, then represent it as a simple function and not some complex wrapper type. This allows your function to be called more flexibly in different situations.

    // return None instead of throwing if the student doesn't exist
    def lookupStudent(id: Int): IO[Option[Student]]
    
    def getStudentName(student: Student): IO[String]
    

    In the above, the lookupStudent step accounts for the possibility of a "no such student" outcome as part of the return type, instead of as a thrown exception.

    In both steps, you said that an API call or DB lookup might happen. Since these could fail in unforeseen ways, you need to decide what should happen in those situations. You could catch the error and return a None, effectively swallowing the error. I don't generally recommend doing that, but hey, it's your program.

    def lookupStudentResiliently(id: Int): IO[Option[Student]] = 
      lookupStudent(id).recover {
        case e: Exception => None
      }
    
    def getStudentNameResiliently(student: Student): IO[Option[String]] =
      getStudentName(student).redeem[Option[String]](
        thrownException => None,
        name => Some(name)
      )
    

    The above *Resiliently functions will catch errors raised by the underlying lookupStudent / getStudentName IOs and treat them as if the lookup just returned a None.

    Then you can incorporate these lookups into your stream using any of the various fs2.Stream composition methods. I'll use evalMapFilter in my example since fits the desired(?) semantics of "just skip it if it doesn't work", when "it doesn't work" is represented by one of the transformations returning a None.

    sourceStream
      .evalMapFilter(lookupStudentResiliently) // changed
      .evalMapFilter(getStudentNameResiliently) // changed
      .map(name => println(s"name $name"))
      .compile
      .drain