scalaapache-spark

Scala class lazy val variables strange behaviour with Spark


I noticed a strange behaviour while using Spark for a project using scala 2.10, I am reading a properties file and writes all the content inside a Map (loadConfig) and I also created a simple method that returns the value of a given key.

The problem is, when I get all the blackListed names in a lazy val class variable, the namesBlackList appears to be empty because all of my Person have the "full access" tag, which is not correct

However, when I write namesBlackList inside filterAccess then everything works perfectly well.

ConfigManager.scala

object ConfigManager extends Serializable {

  private var configMap = Map.empty[String, String]

  def loadConfig(configPath:String) = {
    // Reads a key/value properties file and writes it in the configmap
  }

  def getParameter(parameter: String): String = configMap.getOrElse(parameter, s"${parameter}=>UNKNOWN")
}

AnalyseData.scala

object AnalyseData extends Serializable {

    private lazy val namesBlackList = ConfigManager.getParameter("names.blacklist").toSet

    def filterAccess(rdd:RDD[Person]) : RDD[Person] = {
        rdd.map {person => 
          if (namesBlackList.contains(person.firstName))
            (person.firstName,person.lastName,"limited access")
          else
            (person.firstName,person.lastName,"full Access")  
       }
    }
}

AnalyseService.scala

object AnalyseService extends Serializable {
    def main(path:String) {
        ConfigManager.loadConfig(path)

        val datas = createNameRdd // reads from a db and create a RDD[Person]

        val filteredData = AnalyseData.filterAccess(datas)

    }
}

I tried to tweak everything in my code and it appears that, since Spark execute the map method in a lazy way, setting the result of a Singleton object in a lazy val class variable will not produce the correct result. I can't understand why it is not working and more importantly, I can't really find how to fix this besides calling namesBlackList inside a method

Thanks for your comments.


Solution

  • See https://spark.apache.org/docs/latest/programming-guide.html#understanding-closures-a-nameclosureslinka for explanation of some terms and concepts required. What happens in your case (I think):

    1. ConfigManager.loadConfig(path) runs on the driver node. configMap is initialized there.

    2. In filterAccess, namesBlackList is really a method call. So, when the code inside map is executed on the worker nodes, this call happens there and accesses configMap on the same node, which is empty.

    3. However, when you "write namesBlackList inside filterAccess" then it is a local variable and it does become part of the closure, and is serialized.

    To resolve this, you need to use a broadcast variable for configMap. Something like

    object ConfigManager extends Serializable {
    
      private var configMap: Broadcast[Map[String, String]] = _
    
      def loadConfig(configPath:String) = {
        // Reads a key/value properties file and writes it in the configmap
      }
    
      def getParameter(parameter: String): String = configMap.value.getOrElse(parameter, s"${parameter}=>UNKNOWN")
    }
    

    Even better to avoid var:

    def main(path:String) {
        val configMap = ConfigManager.loadConfig(path)
    
        val datas = createNameRdd(configMap) // reads from a db and create a RDD[Person]
    
        val filteredData = AnalyseData.filterAccess(datas, configMap)
    }