apache-sparkpysparkpartitioningoverwritedelta-lake

Delta Table Overwrite Not Working as Expected with Partitioning in PySpark


I'm working with a large dataset, which is why I need to partition by a specific id.

I have two notebooks that transform data in stages, and I'm confident that the issue lies with the first notebook. When I manually delete the files from the directory where the first notebook writes, the issue is resolved.

Data Transformation Flow

In a normal flow:

  1. Input 1 is transformed into Output 2.
  2. Output 2 is then transformed into Output 3.

However, when I delete a row from Input 1, it still appears in Output 2, even though the following code should overwrite the data in Output 2:

df1.repartition('id').write.format("delta").option("overwriteSchema", "True").mode("overwrite").save('/transformed/output2')

After that, Output 3 is produced using:

df2.write.format("delta").option("overwriteSchema", "True").mode("overwrite").save('/transformed/output3')
Issue

When I manually delete the output2 directory, everything works fine (no more old data), but shouldn't the old data be overwritten automatically? It works as expected most of the time, but in this case, it seems like the old data persists.

Things I've Tried

The main reason for using partitioning is to improve performance, but I’m not sure if it’s contributing to the issue.

Any insights into why this might be happening would be greatly appreciated!


Solution

  • Credit to @AlexanderPavlov for the answer in the comment.


    The issue was that I was unintentionally reading all the data files, including older revisions, instead of just the latest version.

    I resolved this by updating the code from:

    # Old code (incorrect)
    df_subscriptions = spark.read.load(f'/transformed/output2')
    

    To the correct solution:

    # New code (correct)
    df_subscriptions = spark.read.format('delta').load(f'/transformed/output2')
    

    This ensures that only the most recent revision of the data is read.