scalastreamingfs2

pass through mechanism with fs2 Stream


Would anyone know how to reproduce a pass through mechanism (as with Akka Stream) but with fs2 Stream ?

I would like to be able to split the values of on Object into 2 substreams, zipping them back together when the 2 substreams emit values.

One of the substreams would do nothing, only waiting for the other stream to emit a value and zip both values together.

case class Message(offset: Int, record: String)

Stream[Message] ->(offset goes here)Pipe[Int, Int]        -> (zip outputs)Pipe[Message, Message]
                -> (record goes here)Pipe[String, String]

Solution

  • Sadly what you want to do doesn't make sense from the stream point of view.
    First, you can't split a Stream into two independent ones since the life cycle of one depends on the life cycle of the other.
    Second, if you expose the whole Stream to your users, then they would be able to call a lot of things that may increase or reduce the number of elements, thus you would lose the one-to-one semantics.

    I can think of three "solutions" to your problem.

    1. Don't do it

    I know, I know. This is not a solution per se, but is a valid option.
    Take a look to libraries like fs2-kafka they don't hide the off-set, rather they force their users to keep track of it.

    2. Create your own Stream projection

    Create a custom class that internally contains a Stream[Message] but that exposes only one-to-one methods and hides the offset. Then you can ask for a function using that data type.

    3. Concurrency with Queues (dangerous)

    The only way I know of splitting streams is by using Queues (the cats-effect ones).

    The idea is simple, first, you are going to use evalMap to send each offset into a Queue and then just project the message. Then, you can use through on that stream using the Pipe the user provided. Finally, you use another evalMap to retrieve the offset again from the Queue.

    Again, this may lead to corruption if users break the one-to-one semantics.


    Anyways, I would recommend also asking on the typelevel Discord server, folks there may have a different / better advice.