scalafs2cats-effect

How to terminate FS2 stream started from SBT Shell?


If I run this program from SBT shell, then cancel it, it will keep printing "hello". I have to exit SBT to make it stop. Why is that?

import cats.effect.{ExitCode, IO, IOApp}
import fs2.Stream
import scala.concurrent.duration._

object FS2 extends IOApp {

  override def run(args: List[String]) = 
      Stream.awakeEvery[IO](5.seconds).map { _ =>
        println("hello")
      }.compile.drain.as(ExitCode.Error)
}

Solution

  • As it was already mentioned in the comments, your application runs in another thread and it is never terminating since the stream is infinite, so you will have to manually terminate it when a signal like SIGTERM or SIGINT is received by the application (it's emitted whenever you hit ctrl+c to terminate the app).

    You could do something like this:

    1. create an instance of Deferred
    2. Use it to trigger interruptWhen after any of TERM or INT signal is received.

    For example:

    import sun.misc.Signal
    
    object FS2 extends IOApp {
    
      override def run(args: List[String]): IO[ExitCode] = for {
        cancel <- Deferred[IO, Either[Throwable, Unit]] //deferred used as flat telling if terminations signal
                                                        //was received
        _ <- (IO.async_[Unit]{ cb =>
          Signal.handle(
            new Signal("INT"), //INT and TERM signals are nearly identical, we have to handle both
            (sig: Signal) => cb(Right(()))
          )
          Signal.handle(
            new Signal("TERM"),
            (sig: Signal) => cb(Right(()))
          )
        } *> cancel.complete(Right(()))).start //after INT or TERM signal is intercepted it will complete
                                               //deferred and terminate fiber
                                               //we have to run method start to run waiting for signal in another fiber
                                               //in other case program will block here
        app <- Stream.awakeEvery[IO](1.seconds).map { _ => //your stream
          println("hello")
        }.interruptWhen(cancel).compile.drain.as(ExitCode.Error)  //interruptWhen ends stream when deferred completes
      } yield app
    
    }
    

    This version of the app will terminate whenever you hit ctrl + c in sbt shell.