scalaioqueuefs2

Writing elements to a file as they are dequeued from the queue : Scala fs2 Stream


I have a small test of fs2 streams, process elements, waiting and then write them to a file. I'm getting a type error saying and I could not figure out what it does mean:

Error : required: fs2.Stream[[x]cats.effect.IO[x],Unit] => fs2.Stream[[+A]cats.effect.IO[A],Unit], found : [F[_]]fs2.Pipe[F,Byte,Unit] import java.nio.file.Paths

import cats.effect.{Blocker, ExitCode, IO, IOApp, Timer}
import fs2.Stream
import fs2.io
import fs2.concurrent.Queue

import scala.concurrent.duration._
import scala.util.Random

class StreamTypeIntToDouble(q: Queue[IO, Int])(implicit timer: Timer[IO]) {
  import core.Processing._

  val blocker: Blocker =
    Blocker.liftExecutionContext(
      scala.concurrent.ExecutionContext.Implicits.global
    )
  def storeInQueue: Stream[IO, Unit] = {

    Stream(1, 2, 3)
      .covary[IO]
      .evalTap(n => IO.delay(println(s"Pushing $n to Queue")))
      .metered(Random.between(1, 20).seconds)
      .through(q.enqueue)

  }
  def getFromQueue: Stream[IO, Unit] = {
    q.dequeue
      .evalMap(n => IO.delay(println(s"Pulling from queue $n")))
      .through(
        io.file
          .writeAll(Paths.get("file.txt"), blocker)
      )

  }
}

object Five extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    val program = for {
      q <- Queue.bounded[IO, Int](10)
      b = new StreamTypeIntToDouble(q)
      _ <- b.storeInQueue.compile.drain.start
      _ <- b.getFromQueue.compile.drain
    } yield ()
    program.as(ExitCode.Success)
  }
}

Solution

  • There are a couple of issues here, and the first is the most confusing. writeAll is polymorphic in its context F[_], but it requires a ContextShift instance for F (as well as Sync). You don't currently have a ContextShift[IO] in scope, so the compiler won't infer that the F for writeAll should be IO. If you add something like this:

    implicit val ioContextShift: ContextShift[IO] =
      IO.contextShift(scala.concurrent.ExecutionContext.Implicits.global)
    

    …then the compiler will infer IO as you'd expect.

    My suggestion for cases like this is to skip the type inference. Writing it out with the type parameter is only marginally more verbose:

      .through(
        io.file
          .writeAll[IO](Paths.get("file.txt"), blocker)
      )
    

    …and it means that you'll get useful error messages for things like missing type class instances.

    Once you've fixed that issue there are a couple of others. The next is that using evalMap in this context means that you'll have a stream of () values. If you change it to evalTap, the logging side effects will still happen appropriately, but you won't lose the actual values of the stream you call it on.

    The last issue is that writeAll requires a stream of bytes, while you've given it a stream of Ints. How you want to deal with that discrepancy depends on the intended semantics, but for the sake of example something like .map(_.toByte) would make it compile.