I noticed a strange behaviour while using Spark for a project using scala 2.10, I am reading a properties file and writes all the content inside a Map (loadConfig) and I also created a simple method that returns the value of a given key.
The problem is, when I get all the blackListed names in a lazy val
class variable, the namesBlackList
appears to be empty because all of my Person
have the "full access" tag, which is not correct
However, when I write namesBlackList
inside filterAccess
then everything works perfectly well.
ConfigManager.scala
object ConfigManager extends Serializable {
private var configMap = Map.empty[String, String]
def loadConfig(configPath:String) = {
// Reads a key/value properties file and writes it in the configmap
}
def getParameter(parameter: String): String = configMap.getOrElse(parameter, s"${parameter}=>UNKNOWN")
}
AnalyseData.scala
object AnalyseData extends Serializable {
private lazy val namesBlackList = ConfigManager.getParameter("names.blacklist").toSet
def filterAccess(rdd:RDD[Person]) : RDD[Person] = {
rdd.map {person =>
if (namesBlackList.contains(person.firstName))
(person.firstName,person.lastName,"limited access")
else
(person.firstName,person.lastName,"full Access")
}
}
}
AnalyseService.scala
object AnalyseService extends Serializable {
def main(path:String) {
ConfigManager.loadConfig(path)
val datas = createNameRdd // reads from a db and create a RDD[Person]
val filteredData = AnalyseData.filterAccess(datas)
}
}
I tried to tweak everything in my code and it appears that, since Spark execute the map
method in a lazy
way, setting the result of a Singleton object in a lazy val
class variable will not produce the correct result.
I can't understand why it is not working and more importantly, I can't really find how to fix this besides calling namesBlackList
inside a method
Thanks for your comments.
See https://spark.apache.org/docs/latest/programming-guide.html#understanding-closures-a-nameclosureslinka for explanation of some terms and concepts required. What happens in your case (I think):
ConfigManager.loadConfig(path)
runs on the driver node. configMap
is initialized there.
In filterAccess
, namesBlackList
is really a method call. So, when the code inside map
is executed on the worker nodes, this call happens there and accesses configMap
on the same node, which is empty.
However, when you "write namesBlackList inside filterAccess" then it is a local variable and it does become part of the closure, and is serialized.
To resolve this, you need to use a broadcast variable for configMap
. Something like
object ConfigManager extends Serializable {
private var configMap: Broadcast[Map[String, String]] = _
def loadConfig(configPath:String) = {
// Reads a key/value properties file and writes it in the configmap
}
def getParameter(parameter: String): String = configMap.value.getOrElse(parameter, s"${parameter}=>UNKNOWN")
}
Even better to avoid var
:
def main(path:String) {
val configMap = ConfigManager.loadConfig(path)
val datas = createNameRdd(configMap) // reads from a db and create a RDD[Person]
val filteredData = AnalyseData.filterAccess(datas, configMap)
}