scalafuturetask

Trying to make sequential executing of futures. What is wrong?


I am trying to execute function which returns Future sequentially

So, I have a collection

  val in = Seq(1, 1, -1, -2, 3, -4, 5, 6, 7, -1, -2, -9, 1, 2, 2)

and function to process every int in this collection

  def intToFuture(int: Int): Future[Int] = {
    Future {
      println(s"Running function $int")
      Thread.sleep(1500)
      int * 100
    }
  }

I need to implement logic that process collection by portions of parallel processing. Get first n elements, multiply each to 100 in parallel, then get next n elements and do the same... etc.

What I have done(after I read some posts on this site) is, I implement two functions

1)to process a batch of calculations

  def processBatch(ints: Seq[Int])(f: Int => Future[Int]): Future[Seq[Int]] = {
    Future.sequence(ints.map(f))
  }

2)and second, which apply to process iteratively

  def batchTraverse(in: Seq[Int], size: Int)(f: Int => Future[Int]): Future[Seq[Int]] = {
    val grs = in.grouped(size).toList
    def loop(l: Seq[Seq[Int]]): Future[Seq[Int]] = {
      l match {
        case Nil =>
          Future.successful(l.flatten)//? flatten
        case head :: tail =>
          println("head="+head)
          processBatch(head)(f).flatMap{
            s => loop(tail).map{ t =>
              s.appendedAll(t)
            }
          }
      }
    }
    loop(grs)
  }

And start this by

  val fs: Future[Seq[Int]] = batchTraverse(in, 3)(intToFuture)

  fs.onComplete{
    f => println(f)
  }

As a result, it makes only one iteration, where have I mistaken?


Solution

  • Your function actually seems to works fine, what is likely happening is your program is terminating before the future gets a chance to complete, thus you're only seeing the first iteration. By adding an await to your code I was able to get things to work.

    import scala.concurrent._
    import scala.concurrent.duration._
    
    val fs: Future[Seq[Int]] = batchTraverse(in, 3)(intToFuture)
    
    fs.onComplete{
      f => println(f)
    }
    
    Await.result(fs, Duration.Inf)
    

    You'll likely want a duration that smaller than Duration.Inf as this will wait forever for the future to complete. Doing this I was able to get the following output:

    head=List(1, 1, -1)
    Running function 1
    Running function 1
    Running function -1
    head=List(-2, 3, -4)
    Running function -4
    Running function 3
    Running function -2
    head=List(5, 6, 7)
    Running function 7
    Running function 6
    Running function 5
    head=List(-1, -2, -9)
    Running function -9
    Running function -2
    Running function -1
    head=List(1, 2, 2)
    Running function 2
    Running function 2
    Running function 1
    Success(List(100, 100, -100, -200, 300, -400, 500, 600, 700, -100, -200, -900, 100, 200, 200))