scalaapache-sparkapache-spark-sqlnotserializableexception

Why does custom DefaultSource give java.io.NotSerializableException?


this is my first post on SO and my apology if the improper format is being used.

I'm working with Apache Spark to create a new source (via DefaultSource), BaseRelations, etc... and run into a problem with serialization that I would like to understand better. Consider below a class that extends BaseRelation and implements the scan builder.

    class RootTableScan(path: String, treeName: String)(@transient val sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan{


    private val att: core.SRType = 
    {
      val reader = new RootFileReader(new java.io.File(Seq(path) head)) 
      val tmp = 
        if (treeName==null)
          buildATT(findTree(reader.getTopDir), arrangeStreamers(reader), null)
        else 
          buildATT(reader.getKey(treeName).getObject.asInstanceOf[TTree],
            arrangeStreamers(reader), null)
      tmp
    }

    // define the schema from the AST
    def schema: StructType = {
      val s = buildSparkSchema(att)
      s
    }

    // builds a scan
    def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {

      // parallelize over all the files
      val r = sqlContext.sparkContext.parallelize(Seq(path), 1).
        flatMap({fileName =>
          val reader = new RootFileReader(new java.io.File(fileName))
          // get the TTree
          /* PROBLEM !!! */
          val rootTree = 
//            findTree(reader)
            if (treeName == null) findTree(reader)
            else reader.getKey(treeName).getObject.asInstanceOf[TTree]
          new RootTreeIterator(rootTree, arrangeStreamers(reader), 
            requiredColumns, filters)
        })

      println("Done building Scan")
      r
    }
  }
}

PROBLEM identifies where the issue happens. treeName is a val that gets injected into the class thru the constructor. The lambda that uses it is supposed to be executed on the slave and I do need to send the treeName - serialize it. I would like to understand why exactly the code snippet below causes this NotSerializableException. I know for sure that without treeName in it, it works just fine

 val rootTree = 
        // findTree(reader)
         if (treeName == null) findTree(reader)
         else reader.getKey(treeName).getObject.asInstanceOf[TTree]

Below is the Stack trace

org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2056)
  at org.apache.spark.rdd.RDD$$anonfun$flatMap$1.apply(RDD.scala:375)
  at org.apache.spark.rdd.RDD$$anonfun$flatMap$1.apply(RDD.scala:374)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
  at org.apache.spark.rdd.RDD.flatMap(RDD.scala:374)
  at org.dianahep.sparkroot.package$RootTableScan.buildScan(sparkroot.scala:95)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$8.apply(DataSourceStrategy.scala:260)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$8.apply(DataSourceStrategy.scala:260)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:303)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:302)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:379)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:298)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:256)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:60)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:60)
  at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:61)
  at org.apache.spark.sql.execution.SparkPlanner.plan(SparkPlanner.scala:47)
  at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1$$anonfun$apply$1.applyOrElse(SparkPlanner.scala:51)
  at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1$$anonfun$apply$1.applyOrElse(SparkPlanner.scala:48)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:300)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:298)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:298)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:298)
  at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1.apply(SparkPlanner.scala:48)
  at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1.apply(SparkPlanner.scala:48)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:78)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:76)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:83)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:83)
  at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2572)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
  ... 50 elided
Caused by: java.io.NotSerializableException: org.dianahep.sparkroot.package$RootTableScan
Serialization stack:
    - object not serializable (class: org.dianahep.sparkroot.package$RootTableScan, value: org.dianahep.sparkroot.package$RootTableScan@6421e9e7)
    - field (class: org.dianahep.sparkroot.package$RootTableScan$$anonfun$1, name: $outer, type: class org.dianahep.sparkroot.package$RootTableScan)
    - object (class org.dianahep.sparkroot.package$RootTableScan$$anonfun$1, <function1>)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)

From the stack I think I can deduce that it tries to serialize my lambda and can not. this lambda should be a closure as we have a val in there that is defined outside of the lambda scope. But I don't understand why this can not be serialized.

Any help would be really appreciated!!! Thanks a lot!


Solution

  • Any time a scala closure references a class variable, like treeName, then the JVM serializes the parent class along with the closure. Your class RootTableScan is not serializable, though! The solution is to create a local string variable:

        // builds a scan
        def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
    
          val localTreeName = treeName // this is safe to serialize
    
          // parallelize over all the files
          val r = sqlContext.sparkContext.parallelize(Seq(path), 1).
            flatMap({fileName =>
              val reader = new RootFileReader(new java.io.File(fileName))
              // get the TTree
              /* PROBLEM !!! */
              val rootTree = 
    //            findTree(reader)
                if (localTreeName == null) findTree(reader)
                else reader.getKey(localTreeName).getObject.asInstanceOf[TTree]
              new RootTreeIterator(rootTree, arrangeStreamers(reader), 
                requiredColumns, filters)
            })