scalazio

how to race multiple zio effects and get first 2 that completes


zio has method race which returns the first winner, and interrupts all "losers".

How to get the first two or n "winners" that finishes first, and interrupts all others?

The function signature looks like below, where n is the number of "winners", (not the max parallelism). The output list has size n.

final def raceN[R, E, A](n: Int)(as: Iterable[ZIO[R, E, A]]): ZIO[R, Nothing, List[Either[E,A]]]

One use case i can think of is load the same data from different source in parallel, only get the first 2 that returns first, and compare their content to make sure they are the same. If different, fail.


Solution

  • You can use the Structured Concurrency primitives that ZIO offers to build this, but keep in mind that you need to think of cases like "long running effects that do not complete within reasonable time" (hint: may be you need a timeout).

    Here is a short implementation of the raceN

    object MyApp extends ZIOAppDefault:
    
      final def raceN[R, E, A](n: Int)(as: Iterable[ZIO[R, E, A]]): ZIO[R, Nothing, List[Either[E, A]]] =
        for {
          results    <- Ref.make(List.empty[Either[E, A]])
          latch      <- CountdownLatch.make(n)
          fibers     <- ZIO.collectAll {
                          as.map { (eff: ZIO[R, E, A]) =>
                            (eff
                              .either
                              .flatMap(effRes => results.update(effRes :: _))
                              *> latch.countDown).fork
                          }
                        }
          _          <- latch.await
          _          <- ZIO.foreachDiscard(fibers)(_.interrupt)
          raceResult <- results.get
        } yield raceResult
    
      override val run: UIO[ExitCode] =
        val effects = (1 to 5).map(n => ZIO.sleep(n.seconds).as(n).flatMap(n => ZIO.cond(n % 2 == 1, n, s"error: $n")))
        raceN(2)(effects).flatMap(res => Console.printLine(res.toString).orDie) *> ZIO.succeed(ExitCode.success)