I am trying to use an impure ("java") API in the context of a cats-effect IO-App. The impure API looks somewhat like this:
import io.reactivex.Flowable
import java.util.concurrent.CompletableFuture
trait ImpureProducer[A] {
/** the produced output - must be subscribed to before calling startProducing() */
def output: Flowable[A]
/** Makes the producer start publishing its output to any subscribers of `output`. */
def startProducing(): CompletableFuture[Unit]
}
(Of course, there are more methods including stopProducing(), but those are not relevant for my question.)
My (maybe naive) approach to adapt that API looks as follows (leveraging the fact that Flowable
is a org.reactivestreams.Publisher
):
import cats.effect.IO
import fs2.Stream
import fs2.interop.reactivestreams.*
class AdaptedProducer[A](private val underlying: ImpureProducer[A]) {
def output: Stream[IO, A] =
underlying.output.toStreamBuffered(1)
def startProducing: IO[Unit] =
IO.fromCompletableFuture(IO(underlying.startProducing()))
}
My question is: how can I ensure that the output
-stream is subscribed to before I evaluate startProducing
?
For example, how could I fix the following attempt to obtain an IO of the very first item that is produced:
import cats.Parallel
import cats.effect.IO
def firstOutput[A](producer: AdaptedProducer[A]): IO[A] = {
val firstOut: IO[A] = producer.output.take(1).compile.onlyOrError
// this introduces a race condition: it is not ensured that the output-stream
// will already be subscribed to when startProducing is evaluated.
Parallel[IO].parProductL(firstOut)(producer.startProducing)
}
This can be done using the new Java flow
interop.
Note that the reactive-streams inteop is deprecated since the addition of the flow
one.
The API was redesigned to handle more cases (like this one) and the implementation was re-done from scratch to be more efficient.
If the API you are using doesn't have a flow
based version, you can use the FlowAdapters
to wrap it.
The code would look like this:
import org.reactivestreams.FlowAdapters
final class AdaptedProducer[A](underlying: ImpureProducer[A], chunkSize: Int) {
val run: Stream[IO, A] =
fs2.interop.flow.fromPublisher(chunkSize) { subscriber =>
IO(
underlying.output.subscribe(
FlowAdapters.toFlowSubscriber(
subscriber
)
)
) >>
IO.fromCompletableFuture(IO(underlying.startProducing()))
}
}
This ensures that startProducing
is called only when you actually start consuming the Stream
and after have called subscribe
.
Hope this helps :D