scalascala-catsfs2cats-effectdoobie

Save Doobie stream from database to file


Doobie select returns an fs2.Stream(doobie.ConnectionIO, String). If we need to write it to file the obvious option is to call stream.compile.toList.transact(transactor) and then save this list to file.

Is there a way to save result in a streaming way without converting it to list?


Solution

  • The trick is to convert the cats.IO operations into the doobie.ConnectionIO with Async[doobie.ConnectionIO].liftIO(IO(...)). This allows to combine file operations with database operations nicely. Here is a complete example program that streams results to file.

    package com.example
    
    import java.io.BufferedWriter
    
    import better.files.File
    import cats.effect._
    import cats.implicits._
    import doobie._
    import doobie.implicits._
    import fs2.Stream
    
    
    object Example extends IOApp {
      override def run(args: List[String]): IO[ExitCode] = {
        val xa = Transactor.fromDriverManager[IO](
          "org.postgresql.Driver",     // driver classname
          "jdbc:postgresql:example_db",     // connect URL (driver-specific)
          "postgres",                  // user
          ""                          // password
        )
    
        val drop = sql"drop table if exists example".update.run
        val create =
          sql"create table if not exists example (id serial primary key, string_value text not null)".update.run
        val insert = Update[String]("insert into example (string_value) values (?)")
          .updateMany(List("one", "two", "three", "four", "five"))
    
        val setup = for {
          _ <- drop.transact(xa)
          _ <- create.transact(xa)
          _ <- insert.transact(xa)
        } yield ()
    
        val select: Stream[doobie.ConnectionIO, String] =
          sql"select string_value from example".query[String].stream
        val output = writeToFile(select).compile.drain.transact(xa)
    
        for {
          _ <- setup
          _ <- output
        } yield ExitCode.Success
      }
    
      private def writeToFile(result: Stream[doobie.ConnectionIO, String]): Stream[doobie.ConnectionIO, Unit] = {
        Stream.resource(writer("./example.txt")).flatMap { writer =>
          result.intersperse("\n").chunks.evalMap { chunk =>
            Async[doobie.ConnectionIO].liftIO(IO(
              chunk.foreach(writer.write)
            ))
          }
        }
      }
    
      private def writer(path: String): Resource[doobie.ConnectionIO, BufferedWriter] = {
        Resource.make {
          Async[doobie.ConnectionIO].liftIO(IO(
            File(path).newBufferedWriter
          ))
        } { outStream =>
          Async[doobie.ConnectionIO].liftIO(IO(
            outStream.close())
          )
        }
      }
    }