scalafs2cats-effect

Splitting the fs2 stream output to two files


I'm just starting my adventure with fs2 streams. What I want to achieve, is to read a file (a large one, this is why I use fs2), transform it and write the result to two different files (based on some predicate). Some code (from https://github.com/typelevel/fs2), with my comment:

  val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
    def fahrenheitToCelsius(f: Double): Double =
      (f - 32.0) * (5.0/9.0)

    io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
      .through(text.utf8Decode)
      .through(text.lines)
      .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
      .map(line => fahrenheitToCelsius(line.toDouble).toString)
      .intersperse("\n")
      .through(text.utf8Encode)
      .through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker))
      /* instead of the last line I want something like this:
      .through(<write temperatures higher than 10 to one file, the rest to the other one>)
      */
  }

What is the most efficient way to do so? The obvious solution is to have two streams with different filters, but it's inefficient (there will be two passes).


Solution

  • Unfortunately, as far as I know, there's no easy way to split fs2 stream into two.

    What you could do, is splitting your stream by pushing values to one of two queues (1st for value under 10, 2nd for values over or equal 10). If we use NoneTerminatedQueue then queues will be not terminated until we put None into them. Then we can just use dequeue to create separate streams until queues are not closed.

    Example solution below. I split writting to file and reading into separate methods:

    import java.nio.file.Paths
    import cats.effect.{Blocker, ExitCode, IO, IOApp}
    import fs2.concurrent.{NoneTerminatedQueue, Queue}
    import fs2.{Stream, io, text}
    
    object FahrenheitToCelsius extends IOApp {
    
      def fahrenheitToCelsius(f: Double): Double =
        (f - 32.0) * (5.0 / 9.0)
    
      //I split reading into separate method
      def read(blocker: Blocker, over: NoneTerminatedQueue[IO, Double], under: NoneTerminatedQueue[IO, Double]) = io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
        .through(text.utf8Decode)
        .through(text.lines)
        .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
        .map(line => fahrenheitToCelsius(line.toDouble))
        .evalMap { value =>
          if (value > 10) { //here we put values to one of queues
            over.enqueue1(Some(value)) //until we put some queues are not close
          } else {
            under.enqueue1(Some(value))
          }
        }
        .onFinalize(
          over.enqueue1(None) *> under.enqueue1(None) //by putting None we terminate queues
        )
    
      //function write takes as argument source queue and target file
      def write(s: Stream[IO, Double], blocker: Blocker, fileName: String): Stream[IO, Unit] = {
        s.map(_.toString)
          .intersperse("\n")
          .through(text.utf8Encode)
          .through(io.file.writeAll(Paths.get(fileName), blocker))
      }
    
      val converter: Stream[IO, Unit] = for {
        over <- Stream.eval(Queue.noneTerminated[IO, Double]) //here we create 2 queues
        under <- Stream.eval(Queue.noneTerminated[IO, Double])
        blocker <- Stream.resource(Blocker[IO])
        _ <- write(over.dequeue, blocker, "testdata/celsius-over.txt") //we run reading and writing to both
          .concurrently(write(under.dequeue, blocker, "testdata/celsius-under.txt")) //files concurrently
          .concurrently(read(blocker, over, under)) //stream runs until queue over is not terminated
      } yield ()
    
      override def run(args: List[String]): IO[ExitCode] =
        converter
          .compile
          .drain
          .as(ExitCode.Success)
    
    }