scalafs2

How to unzip a file in a fs2 stream


Given a file, say t.gz, that is zipped I want to be able to read this file's content line by line.

I've been able to read the contents using:

Source.fromInputStream(new GZIPInputStream(new BufferedInputStream(new FileInputStream(s))))

However, I'm looking for a way to process these files in a functional paradigm instead which has brought me to fs2.

If I unzip the file, I can do something like this:

import cats.effect._
import fs2.io.file.{Files, Path}
import fs2.{Stream, text, compression, io}


object Main extends IOApp.Simple {
  def doThing(inPath: Path): Stream[IO, Unit] = {
    Files[IO]
      .readAll(inPath)
      .through(text.utf8.decode)
      .through(text.lines)
      .map(line => line)
      .intersperse("\n")
      .through(text.utf8.encode)
      .through(io.stdout)
  }

  val run = doThing(Path("t")).compile.drain
}

where we just go to the console in the end for simplicity.

If instead I leave it in the zipped format, I can't quite seem to find anywhere that shows how these operations would fit together to provide this as a Stream.

fs2 seems to have a compression object (https://www.javadoc.io/doc/co.fs2/fs2-docs_2.13/latest/fs2/compression/Compression.html) that seems it should do what is desired, but if it does haven't figured out how to integrate.

As such, the question is this: How do I read a zipped file into a stream to work with fs2 in a functional paradigm?


Solution

  • You probably want this:

    object Main extends IOApp.Simple {
      def doThing(inPath: Path): Stream[IO, Unit] = {
        Files[IO]
          .readAll(inPath)
          .through(Compression[IO].gunzip())
          .flatMap(_.content)
          .through(text.utf8.decode)
          .through(text.lines)
          .map(line => line)
          .intersperse("\n")
          .through(text.utf8.encode)
          .through(io.stdout)
      }
    
      override final val run =
        doThing(Path("t")).compile.drain
    }