scalafunctional-programmingscala-catscats-effectfs2

fs2.Stream hangs on taking twice


Problem:

I want to repeatedly take some batches from the fs2.Stream provided by some third-party library and therefore abstract clients away from the fs2.Stream itself and give them simply F[List[Int]] batches as soon as they are ready.

Attempts: I tried to use fs2.Stream::take and ran some examples.

I.

implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

val r = for {
  queue <- fs2.concurrent.Queue.unbounded[IO, Int]
  stream = queue.dequeue
  _ <- fs2.Stream.range(0, 1000).covaryAll[IO, Int].evalTap(queue.enqueue1).compile.drain
  _ <- stream.take(10).compile.toList.flatTap(lst => IO(println(lst))).iterateWhile(_.nonEmpty)
} yield ()

r.unsafeRunSync()

It prints the very first batch List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) and then hangs. I expected that all the batches from 0 to 1000 will be printed.

Keeping things a bit simpler here is

II.

implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

val r = for {
  queue <- fs2.concurrent.Queue.unbounded[IO, Int]
  stream = queue.dequeue
  _ <- fs2.Stream.range(0, 1000).covaryAll[IO, Int].evalTap(queue.enqueue1).compile.drain
  _ <- stream.take(10).compile.toList.flatTap(lst => IO(println(lst)))
  _ <- stream.take(20).compile.toList.flatTap(lst => IO(println(lst)))
} yield ()

r.unsafeRunSync()

The behavior is completely the same to I. Prints List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) and then hangs.

Question:

Given an fs2.Stream[IO, Int] how to provide an effect IO[List[Int]] which iterates through consecutive batches provided by the stream when evaluated?


Solution

  • Well, you can not have an IO[List[X]] that represents multiple batches, that IO would be a single batch.

    The best you can do is something like this:

    def processByBatches(process: List[Int] => IO[Unit]): IO[Unit]
    

    That is, your users will give you an operation to execute for each batch and you would give them an IO that will block the current fiber until the whole stream was consumed using that function.

    And the simples way to implement such function would be:

    def processByBatches(process: List[Int] => IO[Unit]): IO[Unit] =
      getStreamFromThirdParty
        .chunkN(n = ChunkSize)
        .evalMap(chunk => process(chunk.toList))
        .compile
        .drain