Scala 2.12 has 2 Future.find
methods.
@deprecated("use the overloaded version of this method that takes a scala.collection.immutable.Iterable instead", "2.12.0")
def find[T](@deprecatedName('futurestravonce) futures: TraversableOnce[Future[T]])(@deprecatedName('predicate) p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]]
And its overloaded version
def find[T](futures: scala.collection.immutable.Iterable[Future[T]])(p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]]
Both of them has the same description
/** Asynchronously and non-blockingly returns a `Future` that will hold the optional result * of the first `Future` with a result that matches the predicate, failed `Future`s will be ignored. * * @tparam T the type of the value in the future * @param futures the `scala.collection.immutable.Iterable` of Futures to search * @param p the predicate which indicates if it's a match * @return the `Future` holding the optional result of the search */
So I'm assuming these methods find first completed Future
that matches param p
in a given List
But only first one is actually doing so.
val start = System.currentTimeMillis
val a = (1 to 3).reverse.iterator.map{ x =>
Future{
Thread.sleep(x * 10000)
x
}
}
val b = Future.find(a)(_.isInstanceOf[Int])
b.foreach{ x =>
println(x)
println(System.currentTimeMillis - start) // 10020
}
The deprected version of the method returns the fastest one.
val a = (1 to 3).reverse.map{ x =>
Future{
Thread.sleep(x * 10000)
x
}
}
val b = Future.find(a)(_.isInstanceOf[Int])
b.foreach{ x =>
println(x)
println(System.currentTimeMillis - start)
}
The overloaded version returns the slowest one. To be more precise, It simply checks a given list from head to tail and it doesn't care how long they takes to be completed.
Is this how it's supposed to be? If so, Is using the duplicated one or implementing it myself only option to care about their times to be completed?
You're right that the deprecated Future.find which expects a TraversableOnce[Future[T]]
in 2.12.x does behave differently from the replacing Future.find
. As you can see from the pasted source code below, the former find
method utilizes Promise
with tryComplete
to effectively capture the first completed future from the input collection, whereas the latter employs a simple hasNext/next
traversal:
@deprecated("use the overloaded version of this method that takes a scala.collection.immutable.Iterable instead", "2.12.0")
def find[T](@deprecatedName('futurestravonce) futures: TraversableOnce[Future[T]])(@deprecatedName('predicate) p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = {
val futuresBuffer = futures.toBuffer
if (futuresBuffer.isEmpty) successful[Option[T]](None)
else {
val result = Promise[Option[T]]()
val ref = new AtomicInteger(futuresBuffer.size)
val search: Try[T] => Unit = v => try {
v match {
case Success(r) if p(r) => result tryComplete Success(Some(r))
case _ =>
}
} finally {
if (ref.decrementAndGet == 0) {
result tryComplete Success(None)
}
}
futuresBuffer.foreach(_ onComplete search)
result.future
}
}
def find[T](futures: scala.collection.immutable.Iterable[Future[T]])(p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = {
def searchNext(i: Iterator[Future[T]]): Future[Option[T]] =
if (!i.hasNext) successful[Option[T]](None)
else {
i.next().transformWith {
case Success(r) if p(r) => successful(Some(r))
case other => searchNext(i)
}
}
searchNext(futures.iterator)
}
One approach to implementing your own might be to expand the Future.firstCompletedOf
method with the added predicate into something like the following:
def firstConditionallyCompletedOf[T](futures: List[Future[T]])(p: T => Boolean)(implicit ec: ExecutionContext): Future[T] = {
val p = Promise[T]()
val firstCompleteHandler = new AtomicReference[Promise[T]](p) with (Try[T] => Unit) {
override def apply(v1: Try[T]): Unit = getAndSet(null) match {
case null => ()
case some => some tryComplete v1
}
}
futures.foreach{ _.filter(condition).onComplete(firstCompleteHandler) }
p.future
}