apache-sparkapache-spark-sqlamazon-eks

Do you still need to cache() before checkpoint() for spark batch processing?


Going off docs/other posts online, you should cache() before checkpoint() because checkpoint() is done afterwards with a different action. However looking at spark query plan, this doesn't seem to be true for what I'm doing:

    <bunch of spark transformations - filters, joins, column renaming>
    df_main = df_main.checkpoint()
    <bunch of spark transformations - filters, joins, column renaming>>
    df_main.write.mode("overwrite").parquet('s3://<location>')

The query plan for the final parquet jobs query: enter image description here

You can see there's a Scan ExistingRDD, which is the results from the previous query for the transformations up to the checkpoint. I was thinking maybe this is some special quirk when doing a write so I tried count() instead and got the same results.

I tried adding cache() to see the difference; all it did was change the checkpoint() query to have InMemoryRelation and InMemoryTableScan instead of AdaptiveSparkPlan, jobs still split the same.

I want to understand exactly whats happening so that I know how/if I should be using checkpoint+cache() together. Am I looking at the wrong thing? Is my understanding incorrect?

Thanks

ps: The one time I did find cache() useful, was when I branched off from before the checkpoint; in this case the write did use existing RDD, but the branch had recomputed everything.

    df_main = xxx
    df_branch = df_main.<transformations>
    df_main = df_main.checkpoint()
    df_main.union(df_branch)
    df_main.write....

Spark version I'm on: 3.3.0-amzn-1, using AWS/EKS

edit: I noticed that in a lot of the examples, people don't reassign, ie they do df.checkpoint() not df = df.checkpoint(). When I do this, then both checkpoint+write will re-do all the transformations, and then it makes sense to cache(). This sort of begs the question then, what is the case I want to do df.checkpoint() without reassignment? When I just want to make a checkpoint but not to truncate lineage? Is it only useful for spark streaming?


Solution

  • As far as I can tell, YOU NO LONGER NEED TO CACHE() BEFORE CHECKPOINT()

    I think most of the calls for caching before count stem from https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md which is a around 7-8 years old now based on the blame

    Instead, it waits until the end of a job, and launches another job to finish Checkpoint. An RDD which needs to be checkpointed will be computed twice; thus it is suggested to do a rdd.cache() before rdd.checkpoint(). In this case, the second job will not recompute the RDD. Instead, it will just read cache
    

    In fact, this medium article lifts this passage directly as well: https://medium.com/@badwaik.ojas/persist-cache-and-checkpoint-in-apache-spark-ae71783ce3dd. However the article's result literally show what I'm saying; it does not re-calculate the transformations before the checkpoint, it just scans existing RDD.

    Should cache and checkpoint be used together on DataSets? If so, how does this work under the hood? mentioned that internally it does count() which was causing the double-calculation, which WAS true, however

    https://issues.apache.org/jira/browse/SPARK-8582 https://github.com/apache/spark/pull/35005

    so checkpoint is no longer double-calculating. This also matches what I've found in my testing (it also reflects the results in that medium article)

    Just make sure you reassign your df df = df.checkpoint