apache-sparkapache-spark-sqlspark-structured-streamingapache-spark-2.0

How to delete old data that was created by Spark Structured Streaming?


how can I delete old data created by Spark Structured Streaming (Spark 2.4.5)?

I have data on HDFS in parquet/avro format (not Delta), that is created by Spark Structured Streaming and partitioned by time (year,month,day of month,hour).

The data is created as following:

query = df.writeStream.format("avro").partitionBy("year", "month", "day", "hour").outputMode("append").option("checkpointLocation", "/data/avro.cp").start("/data/avro")

As result i have following partition folder layout:

./year=2020/month=3/day=13/hour=12
./year=2020/month=3/day=13/hour=13
./year=2020/month=3/day=13/hour=14
./year=2020/month=3/day=13/hour=15
./year=2020/month=3/day=13/hour=16

How can I delete old data, for example older than year=2020,month=2,day=13,hour=14?

Just deleting relevant folders

./year=2020/month=3/day=13/hour=12
./year=2020/month=3/day=13/hour=13

throws an exception while reading batch dataframe from file-system:

df = spark.read.format("avro").load("/data/avro")
java.io.FileNotFoundException: File file:/data/avro/year=2020/month=3/day=13/hour=12/part-00000-0cc84e65-3f49-4686-85e3-1ecf48952794.c000.avro does not exist

As I've figured out that is somehow related to _spark_metadata folder that is used by checkpoints.

Appreciate for your help.


Solution

  • You can't delete that folder unless you delete it's corresponding checkpoint folders too. You are trying to delete the folder while the checkpoint still has knowledge of it, so that is why the error is occurring.

    However, I really wouldn't recommend messing with the checkpoint folder unless necessary. If it's possible in your situation, I'd suggest instead moving your old data to different data storage types such as in AWS Standard -> Glacier.