zio
has method race
which returns the first winner, and interrupts all "losers".
How to get the first two or n
"winners" that finishes first, and interrupts all others?
The function signature looks like below, where n
is the number of "winners", (not the max parallelism). The output list has size n
.
final def raceN[R, E, A](n: Int)(as: Iterable[ZIO[R, E, A]]): ZIO[R, Nothing, List[Either[E,A]]]
One use case i can think of is load the same data from different source in parallel, only get the first 2 that returns first, and compare their content to make sure they are the same. If different, fail.
You can use the Structured Concurrency primitives that ZIO offers to build this, but keep in mind that you need to think of cases like "long running effects that do not complete within reasonable time" (hint: may be you need a timeout).
Here is a short implementation of the raceN
object MyApp extends ZIOAppDefault:
final def raceN[R, E, A](n: Int)(as: Iterable[ZIO[R, E, A]]): ZIO[R, Nothing, List[Either[E, A]]] =
for {
results <- Ref.make(List.empty[Either[E, A]])
latch <- CountdownLatch.make(n)
fibers <- ZIO.collectAll {
as.map { (eff: ZIO[R, E, A]) =>
(eff
.either
.flatMap(effRes => results.update(effRes :: _))
*> latch.countDown).fork
}
}
_ <- latch.await
_ <- ZIO.foreachDiscard(fibers)(_.interrupt)
raceResult <- results.get
} yield raceResult
override val run: UIO[ExitCode] =
val effects = (1 to 5).map(n => ZIO.sleep(n.seconds).as(n).flatMap(n => ZIO.cond(n % 2 == 1, n, s"error: $n")))
raceN(2)(effects).flatMap(res => Console.printLine(res.toString).orDie) *> ZIO.succeed(ExitCode.success)