scalacats-effectfs2

fs2 stream does not interrupt on Deferred


The fs2 stream does not interrupt here:

import cats.effect.*
import fs2.Stream
import scala.concurrent.duration.*
import cats.effect.unsafe.implicits.global

val test = for {
      cancel <- Deferred[IO, Either[Throwable, Unit]]
      _ <- (IO.unit.delayBy(5.seconds).map { _ => println("Completing deferred"); cancel.complete(Right(())) }).start
      _ <- Stream.awakeEvery[IO](1.second).map(x => println(x)).interruptWhen(cancel).compile.drain
    } yield ()

test.unsafeRunSync()

but it interrupts if we swap the lines and fibers:

import cats.effect.*
import fs2.Stream
import scala.concurrent.duration.*
import cats.effect.unsafe.implicits.global

val test = for {
      cancel <- Deferred[IO, Either[Throwable, Unit]]
      _ <- Stream.awakeEvery[IO](1.second).map(x => println(x)).interruptWhen(cancel).compile.drain.start
      _ <- (IO.unit.delayBy(5.seconds).map { _ => println("Completing deferred"); cancel.complete(Right(())) })
    } yield ()

test.unsafeRunSync()

I wonder why...


Solution

  • The issue is that you are not using IO properly.
    Remember an IO[A] is just a program description, a value. It does nothing on its own.

    When you call cancel.complete you are just creating a new program, it is not doing anything unless you compose it with other programs. And you are composing it in a map method; which doesn't really combine the programs, so your cancel is lost, and the start will just create a fiber that will create such a program and discard it.

    In the second example, since for translates everything to a flatMap you ended up composing the program by accident.

    The quick solution is to use flatMap rather than map in the first example. But, IMHO, a better solution is using proper combinators like this:

    val run: IO[Unit] =
      Deferred[IO, Either[Throwable, Unit]].flatMap { cancelToken =>
        val cancel =
          IO.sleep(5.seconds) >>
          IO.println("Completing deferred") >>
          cancelToken.complete(Right(()))
    
        val program =
          IO.println("Starting stream") >>
          Stream
            .awakeEvery[IO](1.second)
            .foreach(x => IO.println(x))
            .interruptWhen(cancelToken)
            .compile
            .drain >>
          IO.println("Stream finished")
    
      cancel.background.surround(program)
    }
    

    You can see the code running here.