scalascalaz-stream

Usage example of scalaz-stream's inflate


In the following usage example of scalaz-stream (taken from the documentation), what do I need to change if the input and/or output is a gzipped file? In other words, how do I use compress?

import scalaz.stream._
import scalaz.concurrent.Task

val converter: Task[Unit] =
  io.linesR("testdata/fahrenheit.txt")
    .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
    .map(line => fahrenheitToCelsius(line.toDouble).toString)
    .intersperse("\n")
    .pipe(text.utf8Encode)
    .to(io.fileChunkW("testdata/celsius.txt"))
    .run

// at the end of the universe...
val u: Unit = converter.run

Solution

  • Compressing the output is easy. Since compress.deflate() is a Process1[ByteVector, ByteVector] you need to plug it into your pipeline where you are emitting ByteVectors (that is right after text.utf8Encode which is a Process1[String, ByteVector]):

    val converter: Task[Unit] =
      io.linesR("testdata/fahrenheit.txt")
        .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
        .map(line => fahrenheitToCelsius(line.toDouble).toString)
        .intersperse("\n")
        .pipe(text.utf8Encode)
        .pipe(compress.deflate())
        .to(io.fileChunkW("testdata/celsius.zip"))
        .run
    

    For inflate you can't use io.linesR to read the compressed file. You need a process that produces ByteVectors instead of Strings in order to pipe them into inflate. (You could use io.fileChunkR for that.) The next step would be decoding the uncompressed data to Strings (with text.utf8Decode for example) and then using text.lines() to emit the text line by line. Something like this should do the trick:

    val converter: Task[Unit] =
      Process.constant(4096).toSource
        .through(io.fileChunkR("testdata/fahrenheit.zip"))
        .pipe(compress.inflate())
        .pipe(text.utf8Decode)
        .pipe(text.lines())
        .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
        .map(line => fahrenheitToCelsius(line.toDouble).toString)
        .intersperse("\n")
        .pipe(text.utf8Encode)
        .to(io.fileChunkW("testdata/celsius.txt"))
        .run