scalacats-effectfs2

FS2 through2 closing all resources when the first stream is finished?


Let's suppose we have 2 fs2 Streams:

val stream1 = fs2.Stream.bracket(IO { println("Acquire 1"); 2})(_ => IO { println("Release 1") })
  .flatMap(p => fs2.Stream.range(1,p))

val stream2 = fs2.Stream.bracket(IO { println("Acquire 2"); 4})(_ => IO { println("Release 2") })
  .flatMap(p => fs2.Stream.range(1,p))

which I would like to connect with each other:

def connect[F[_]]: (fs2.Stream[F, Int], fs2.Stream[F, Int]) => fs2.Stream[F, Int] = {
 def go(stream1: fs2.Stream[F, Int], stream2: fs2.Stream[F, Int]): Pull[F, Int, Unit] =
  stream1.pull.uncons1.flatMap { stream1Element =>
    stream2.pull.uncons1.flatMap { stream2Element =>
      (stream1Element, stream2Element) match {
        case (Some((stream1Head, stream1Tail)), Some((stream2Head, stream2Tail))) =>
          println("Some, Some")
          Pull.output1(stream1Head + stream2Head) >> go(stream1Tail, stream2Tail)
        case (Some((stream1Head, stream1Tail)), None) =>
          println("1 Stream still available")
          Pull.output1(stream1Head) >> go(fs2.Stream.empty, stream1Tail)
        case (None, Some((stream2Head, stream2Tail))) =>
          println("2 Stream still available")
          Pull.output1(stream2Head) >> go(fs2.Stream.empty, stream2Tail)
        case _ => Pull.output1(-1)
      }
    }
  }
(one, two) => go(one, two).stream

}

now checking logs I see:

Acquire 1
Acquire 2
Some, Some
Release 2
Release 1
2 Stream still available
2 Stream still available

which is a bit surprising for me because it seems that once the first Stream is finished the resources of the second one are closed as well. Suppose now that the resource is the connection to the database, then the elements from the second stream cannot be fetched anymore.

Is it correct behavior? Is there any way to avoid closing the resource of the second stream? Surprisingly if the first Stream has more elements than the second one, everything works as expected(stream 1's resource is not closed when the second stream is finished)


Solution

  • By checking the implementation of the zipAllWith function I found out that indeed uncons1 in such cases should be avoided. The final solution would be to use the stepLeg function instead of uncons1. So the function from the above should look like this:

    def connect[F[_]]: (fs2.Stream[F, Int], fs2.Stream[F, Int]) => fs2.Stream[F, Int] = {
    def go(stream1: fs2.Stream[F, Int], stream2: fs2.Stream[F, Int]): Pull[F, Int, Unit] =
      stream1.pull.stepLeg.flatMap { stream1Element =>
        stream2.pull.stepLeg.flatMap { stream2Element =>
          (stream1Element, stream2Element) match {
            case (Some(sl1), Some(sl2)) =>
              println("Some, Some")
              val one = sl1.head(0)
              val two = sl2.head(0)
              Pull.output1(one + two) >> go(sl1.stream, sl2.stream)
            case (Some(sl1), None) =>
              val one = sl1.head(0)
              println("1 Stream still available")
              Pull.output1(one) >> go(sl1.stream, fs2.Stream.empty)
            case (None, Some(sl2)) =>
              val two = sl2.head(0)
              println("2 Stream still available")
              Pull.output1(two) >> go(fs2.Stream.empty, sl2.stream)
            case _ => Pull.output1(-1)
          }
        }
      }
    (one, two) => {
      go(one.flatMap(fs2.Stream.emit), two.flatMap(fs2.Stream.emit)).stream
    }
    }
    

    And the logs:

    Acquire 1
    Acquire 2
    Some, Some
    Release 1
    2 Stream still available
    2 Stream still available
    Release 2
    

    An additional example of this issue can be found here: uncons vs stepLeg