scalalistbuffer

Scala: Parallel execution with ListBuffer appends doesn't produce expected outcome


I know I'm doing something wrong with mutable.ListBuffer but I can't figure out how to fix it (and a proper explanation of the issue).

I simplified the code below to reproduce the behavior.

I'm basically trying to run functions in parallel to add elements to a list as my first list get processed. I end up "losing" elements.

import java.util.Properties

import scala.collection.mutable.ListBuffer
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}


import scala.concurrent.{ExecutionContext}
import ExecutionContext.Implicits.global


object MyTestObject {

  var listBufferOfInts = new ListBuffer[Int]() // files that are processed

  def runFunction(): Int = {
    listBufferOfInts = new ListBuffer[Int]()
    val inputListOfInts = 1 to 1000
    val fut = Future.traverse(inputListOfInts) { i =>
      Future {
        appendElem(i)
      }
    }
    Await.ready(fut, Duration.Inf)
    listBufferOfInts.length
  }

  def appendElem(elem: Int): Unit = {
    listBufferOfInts ++= List(elem)
  }
}

MyTestObject.runFunction()
MyTestObject.runFunction()
MyTestObject.runFunction()

which returns:

res0: Int = 937
res1: Int = 992
res2: Int = 997

Obviously I would expect 1000 to be returned all the time. How can I fix my code to keep the "architecture" but make my ListBuffer "synchronized" ?


Solution

  • I don't know what exact problem is as you said you simplified it, but still you have an obvious race condition, multiple threads modify a single mutable collection and that is very bad. As other answers pointed out you need some locking so that only one thread could modify collection at the same time. If your calculations are heavy, appending result in synchronized way to a buffer shouldn't notably affect the performance but when in doubt always measure.

    But synchronization is not needed, you can do something else instead, without vars and mutable state. Let each Future return your partial result and then merge them into a list, in fact Future.traverse does just that.

    import scala.concurrent.duration._
    import scala.concurrent.{Await, Future}
    import scala.concurrent.ExecutionContext.Implicits.global
    
    def runFunction: Int = {
      val inputListOfInts = 1 to 1000
    
      val fut: Future[List[Int]] = Future.traverse(inputListOfInts.toList) { i =>
        Future { 
          // some heavy calculations on i
          i * 4
        }
      }
    
      val listOfInts = Await.result(fut, Duration.Inf)
      listOfInts.size
    }
    

    Future.traverse already gives you an immutable list with all your results combined, no need to append them to a mutable buffer. Needless to say, you will always get 1000 back.

    @ List.fill(10000)(runFunction).exists(_ != 1000) 
    res18: Boolean = false