scalafuturescala-2.12

Scala Future.find


Scala 2.12 has 2 Future.find methods.

@deprecated("use the overloaded version of this method that takes a scala.collection.immutable.Iterable instead", "2.12.0")
def find[T](@deprecatedName('futurestravonce) futures: TraversableOnce[Future[T]])(@deprecatedName('predicate) p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]]

And its overloaded version

def find[T](futures: scala.collection.immutable.Iterable[Future[T]])(p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]]

Both of them has the same description

  /** Asynchronously and non-blockingly returns a `Future` that will hold the optional result
   *  of the first `Future` with a result that matches the predicate, failed `Future`s will be ignored.
   *
   * @tparam T        the type of the value in the future
   * @param futures   the `scala.collection.immutable.Iterable` of Futures to search
   * @param p         the predicate which indicates if it's a match
   * @return          the `Future` holding the optional result of the search
   */

So I'm assuming these methods find first completed Future that matches param p in a given List

But only first one is actually doing so.

  val start = System.currentTimeMillis
  val a = (1 to 3).reverse.iterator.map{ x =>
    Future{
      Thread.sleep(x * 10000)
      x
    }
  }
  val b = Future.find(a)(_.isInstanceOf[Int])
  b.foreach{ x =>
    println(x)
    println(System.currentTimeMillis - start) // 10020 
  }

The deprected version of the method returns the fastest one.

  val a = (1 to 3).reverse.map{ x =>
    Future{
      Thread.sleep(x * 10000)
      x
    }
  }
  val b = Future.find(a)(_.isInstanceOf[Int])
  b.foreach{ x =>
    println(x)
    println(System.currentTimeMillis - start)
  }

The overloaded version returns the slowest one. To be more precise, It simply checks a given list from head to tail and it doesn't care how long they takes to be completed.

Is this how it's supposed to be? If so, Is using the duplicated one or implementing it myself only option to care about their times to be completed?


Solution

  • You're right that the deprecated Future.find which expects a TraversableOnce[Future[T]] in 2.12.x does behave differently from the replacing Future.find. As you can see from the pasted source code below, the former find method utilizes Promise with tryComplete to effectively capture the first completed future from the input collection, whereas the latter employs a simple hasNext/next traversal:

    @deprecated("use the overloaded version of this method that takes a scala.collection.immutable.Iterable instead", "2.12.0")
    def find[T](@deprecatedName('futurestravonce) futures: TraversableOnce[Future[T]])(@deprecatedName('predicate) p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = {
      val futuresBuffer = futures.toBuffer
      if (futuresBuffer.isEmpty) successful[Option[T]](None)
      else {
        val result = Promise[Option[T]]()
        val ref = new AtomicInteger(futuresBuffer.size)
        val search: Try[T] => Unit = v => try {
          v match {
            case Success(r) if p(r) => result tryComplete Success(Some(r))
            case _ =>
          }
        } finally {
          if (ref.decrementAndGet == 0) {
            result tryComplete Success(None)
          }
        }
    
        futuresBuffer.foreach(_ onComplete search)
    
        result.future
      }
    }
    
    def find[T](futures: scala.collection.immutable.Iterable[Future[T]])(p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = {
      def searchNext(i: Iterator[Future[T]]): Future[Option[T]] =
        if (!i.hasNext) successful[Option[T]](None)
        else {
          i.next().transformWith {
            case Success(r) if p(r) => successful(Some(r))
            case other => searchNext(i)
          }
        }
      searchNext(futures.iterator)
    }
    

    One approach to implementing your own might be to expand the Future.firstCompletedOf method with the added predicate into something like the following:

    def firstConditionallyCompletedOf[T](futures: List[Future[T]])(p: T => Boolean)(implicit ec: ExecutionContext): Future[T] = {
      val p = Promise[T]()
      val firstCompleteHandler = new AtomicReference[Promise[T]](p) with (Try[T] => Unit) {
        override def apply(v1: Try[T]): Unit = getAndSet(null) match {
          case null => ()
          case some => some tryComplete v1
        }
      }
      futures.foreach{ _.filter(condition).onComplete(firstCompleteHandler) }
      p.future
    }