apache-sparkapache-kafkaspark-structured-streamingspark-checkpoint

How to clean up the checkpoint files accumulated in spark structured streaming?


I added the checkpoint for SparkContext and write query for kafka data streaming for the long run spark structured streaming job.

spark.sparkContext.setCheckpointDir("/tmp/checkpoint")

...

val monitoring_stream = monitoring_df.writeStream
                              .trigger(Trigger.ProcessingTime("120 seconds"))
                              .option("checkpointLocation", "s3a://spark-checkpoint/checkpointfiles")
                             .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
                                if(!batchDF.isEmpty) 
                                {
                                }
                             .start()
                             .awaitTermination()

The spark job runs stably. However, I noticed that the checkpoint files were accumulated in HDFS and S3 without automatic cleanup. I saw the storage space were continuously eaten up by these files. Are there some way to configure the retention time for these checkpoint files to get it automatically deleted? Or do I need to run some cron job to manually delete them? If I delete them manually, will it impact the ongoing spark jobs? Thanks!


Solution

  • spark.cleaner.referenceTracking.cleanCheckpoints needs to be set to true, default is false.