apache-sparkcheckpointing

Spark Checkpointing Non-Streaming - Checkpoint files can be used in subsequent job run or driver program


This text from an interesting article: http://www.lifeisafile.com/Apache-Spark-Caching-Vs-Checkpointing/

" ... Checkpointing stores the rdd physically to hdfs and destroys the lineage that created it. The checkpoint file won’t be deleted even after the Spark application terminated. Checkpoint files can be used in subsequent job run or driver program. Checkpointing an RDD causes double computation because the operation will first call a cache before doing the actual job of computing and writing to the checkpoint directory. ..."

I seem to remember reading elsewhere that checkpointed files were only for the a Job or shared Jobs in a given Spark App.

Looking for clarification and how a new App could use the checkpoint directory, as I did not think that was possible.


Solution

  • I seem to remember reading elsewhere that checkpointed files were only for the Job or shared Jobs in a given Spark App.

    Spark will not purge the checkpoint directory even after stopping the SparkContext. We can turn on auto cleanup by setting below propert:

    spark.conf.set("spark.cleaner.referenceTracking.cleanCheckpoints", "true")
    

    Looking for clarification and how a new App could use the checkpoint directory, as I did not think that was possible.

    To reuse the checkpointed dataset again we can follow below steps:

    1. Start context 1 and checkpoint dataset:
    // Setting logger on for ReliableRDDCheckpointData
    scala> import org.apache.log4j.{Level, Logger}
    scala> Logger.getLogger("org.apache.spark.rdd.ReliableRDDCheckpointData").setLevel(Level.INFO)
    
    // Note application ID
    scala> spark.sparkContext.applicationId
    res1: String = local-1567969150914
    
    // Set checkpoint Dir
    scala> spark.sparkContext.setCheckpointDir("/tmp/spark/checkpoint")
    
    // File system localtion
    Users-Air:checkpoint User$ pwd
    /tmp/spark/checkpoint
    Users-Air:checkpoint User$ ls -lrth
    total 0
    drwxr-xr-x  2 User  wheel    64B Sep  8 15:00 7aabcb46-e707-49dd-8893-148a162368d5
    
    // Create Dataframe
    scala> val df = spark.range(3).withColumn("random", rand())
    scala> df.show
    +---+------------------+
    | id|            random|
    +---+------------------+
    |  0|0.8517439782779789|
    |  1| 0.288880016535247|
    |  2|0.7027831376739603|
    +---+------------------+
    
    scala> df.schema
    res5: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false), StructField(random,DoubleType,false))
    
    //Check point 
    scala> df.checkpoint
    19/09/08 15:02:22 INFO ReliableRDDCheckpointData: Done checkpointing RDD 7 to file:/tmp/spark/checkpoint/7aabcb46-e707-49dd-8893-148a162368d5/rdd-7, new parent is RDD 8
    res6: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, random: double]
    
    // New RDD saved in checkpoint directory /tmp/spark/checkpoint/7aabcb46-e707-49dd-8893-148a162368d5/rdd-7
    Users-Air:7aabcb46-e707-49dd-8893-148a162368d5 User$ cd rdd-7/
    Users-Air:rdd-7 User$ ls -lrth
    total 32
    -rw-r--r--  1 User  wheel     4B Sep  8 15:02 part-00000
    -rw-r--r--  1 User  wheel   163B Sep  8 15:02 part-00002
    -rw-r--r--  1 User  wheel   163B Sep  8 15:02 part-00001
    -rw-r--r--  1 User  wheel   163B Sep  8 15:02 part-00003
    
    // Stop context 
    scala> spark.stop
    scala> :quit
    
    
    1. Start new Context 2 and read the checkpointed dataset
    // Initilaized New Context 
    scala> spark.sparkContext.applicationId
    res0: String = local-1567969525656
    

    SparkContext.checkpointFile is a protected[spark] method so we need to create class under org.apache.spark package

    scala> :paste -raw
    // Entering paste mode (ctrl-D to finish)
    
    package org.apache.spark
    object RecoverCheckpoint {
      import scala.reflect.ClassTag
      import org.apache.spark.rdd.RDD
      def recover[T: ClassTag](sc: SparkContext, path: String): RDD[T] = {
        sc.checkpointFile[T](path)
      }
    }
    
    

    Now recover the Checkpointed RDD as RDD[InternalRow] using above RecoverCheckpoint class

    // Path from first context
    scala> val checkPointFilePath = "/tmp/spark/checkpoint/7aabcb46-e707-49dd-8893-148a162368d5/rdd-7"
    scala> import org.apache.spark.RecoverCheckpoint
    scala> import org.apache.spark.sql.catalyst.InternalRow
    scala> import org.apache.spark.sql.types._
    scala> val RecoveredRDD = RecoverCheckpoint.recover[InternalRow](spark.sparkContext, checkPointFilePath)
    
    // RDD is recovered as RDD[InternalRow]
    scala> RecoveredRDD
    res2: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = ReliableCheckpointRDD[0] at recover at <console>:34
    
    // Count matches with original
    RecoveredRDD.count
    res3: Long = 3
    
    

    To convert recovered RDD to Dataset creating RecoverCheckpointRDDToDF class

    
    // Need to convert RDD[InternalRow] to DataFrame
    scala> :paste -raw
    // Entering paste mode (ctrl-D to finish)
    
    // Creating Dataframe from RDD[InternalRow]
    package org.apache.spark.sql
    object RecoverCheckpointRDDToDF {
      import org.apache.spark.rdd.RDD
      import org.apache.spark.sql.{DataFrame, SparkSession}
      import org.apache.spark.sql.catalyst.InternalRow
      import org.apache.spark.sql.types.StructType
      def createDataFrame(spark: SparkSession, catalystRows: RDD[InternalRow], schema: StructType): DataFrame = {
        spark.internalCreateDataFrame(catalystRows, schema)
      }
    }
    
    

    Finally, use the RecoverCheckpointRDDToDF and get the dataset back

    // Schema should be know
    val df_schema = StructType(List(StructField("id",LongType,false), StructField("random",DoubleType,false)))
    df_schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false), StructField(random,DoubleType,false))
    
    scala> import org.apache.spark.sql.RecoverCheckpointRDDToDF
    scala> val df = RecoverCheckpointRDDToDF.createDataFrame(spark, RecoveredRDD, df_schema)
    
    scala> df.show
    +---+------------------+
    | id|            random|
    +---+------------------+
    |  0|0.8517439782779789|
    |  1| 0.288880016535247|
    |  2|0.7027831376739603|
    +---+------------------+
    
    // Same as first context
    
    // Stop context
    scala> spark.stop
    scala> :quit