hivepysparkspark-checkpoint

reading from hive table and updating same table in pyspark - using checkpoint


I am using spark version 2.3 and trying to read hive table in spark as:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
df = spark.table("emp.emptable")

here I am adding a new column with current date from system to the existing dataframe

import pyspark.sql.functions as F
newdf = df.withColumn('LOAD_DATE', F.current_date())

and now facing an issue,when I am trying to write this dataframe as hive table

newdf.write.mode("overwrite").saveAsTable("emp.emptable")

pyspark.sql.utils.AnalysisException: u'Cannot overwrite table emp.emptable that is also being read from;'

so I am checkpointing the dataframe to break the lineage since I am reading and writing from same dataframe

checkpointDir = "/hdfs location/temp/tables/"
spark.sparkContext.setCheckpointDir(checkpointDir)
df = spark.table("emp.emptable").coalesce(1).checkpoint()
newdf = df.withColumn('LOAD_DATE', F.current_date())
newdf.write.mode("overwrite").saveAsTable("emp.emptable")

This way it's working fine and new column has been added to the hive table. but I have to delete the checkpoint files every time it's get created. Is there any best way to break the lineage and write the same dataframe with updated column details and save it to hdfs location or as a hive table.

or is there any way to specify a temp location for checkpoint directory, which will get deleted post the spark session completes.


Solution

  • As we discussed in this post, setting below property is way to go.

    spark.conf.set("spark.cleaner.referenceTracking.cleanCheckpoints", "true")
    

    That question had different context. we wanted to retain the checkpointed dataset so did not care to add on cleanup solution.

    Setting above property is working sometime(tested scala, java and python) but its hard to rely on it. Official document says that by setting this property it Controls whether to clean checkpoint files if the reference is out of scope. I don't know what exactly it means because my understanding is that once spark session/context is stopped it should clean it. Would be great if someone can shad light on it.

    Regarding

    Is there any best way to break the lineage

    Check this question, @BiS found some way to cut the lineage using createDataFrame(RDD, Schema) method. I haven't tested it by myself though.

    Just FYI, I don't rely on above property usually and delete the checkpointed directory in code itself to be on safe side.

    We can get the checkpointed directory like below:

    Scala :

    //Set directory
    scala> spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoint/")
    
    scala> spark.sparkContext.getCheckpointDir.get
    res3: String = hdfs://<name-node:port>/tmp/checkpoint/625034b3-c6f1-4ab2-9524-e48dfde589c3
    
    //It gives String so we can use org.apache.hadoop.fs to delete path 
    

    PySpark:

    // Set directory
    >>> spark.sparkContext.setCheckpointDir('hdfs:///tmp/checkpoint')
    >>> t = sc._jsc.sc().getCheckpointDir().get()
    >>> t 
    u'hdfs://<name-node:port>/tmp/checkpoint/dc99b595-f8fa-4a08-a109-23643e2325ca'
    
    # notice 'u' at the start which means It returns unicode object use str(t)
    # Below are the steps to get hadoop file system object and delete
    
    >>> fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
    fs.exists(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
    True
    
    >>> fs.delete(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
    True
    
    >>> fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
    fs.exists(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
    False