scalaapache-sparklazy-evaluation

When are cache and persist executed (since they don't seem like actions)?


I am implementing a spark application, of which below is a sample snippet(Not the exact same code):

val rdd1 = sc.textfile(HDFS_PATH)
val rdd2 = rdd1.map(func)
rdd2.persist(StorageLevel.MEMORY_AND_DISK)
println(rdd2.count)

On checking the performance of this code from the Spark Application Master UI, I see an entry for the count action, but not for the persist. The DAG for this count action also has a node for the 'map' transformation (line 2 of the above code).

Is it safe to conclude that the map transformation is executed when count (in the last line) is encountered, and not when persist is encountered?

Also, at what point is rdd2 actually persisted? I understand that only two types of operations can be called on RDDs - transformations and actions. If the RDD is persisted lazily when the count action is called, would persist be considered a transformation or an action or neither?


Solution

  • Dataset's cache and persist operators are lazy and don't have any effect until you call an action (and wait till the caching has finished which is the extra price for having a better performance later on).

    From Spark's official documentation RDD Persistence (with the sentence in bold mine):

    One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.

    You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.

    That's exactly the reason why some people (and Spark SQL itself!) do the following trick:

    rdd2.persist(StorageLevel.MEMORY_AND_DISK).count()
    

    to trigger the caching.

    count operator is fairly cheap so the net effect is that the caching is executed almost immediately after the line (there might be a small delay before the caching has completed as it executes asynchronously).

    The benefits of this count after persist are as follows:

    1. No action (but the count itself) will "suffer" the extra time for caching

    2. The time between this line and the place where the cached rdd2 is used could be enough to fully complete the caching and hence the time would be used better (without extra "slowdown" for caching)

    So when you asked:

    would persist be considered a transformation or an action or neither?

    I'd say it's neither and consider it an optimization hint (that may or may not be executed or taken into account ever).


    Use web UI's Storage tab to see what Datasets (as their underlying RDDs) have already been persisted.

    enter image description here

    You can also see cache or persist operators' output using explain (or simply QueryExecution.optimizedPlan).

    val q1 = spark.range(10).groupBy('id % 5).agg(count("*") as "count").cache
    scala> q1.explain
    == Physical Plan ==
    *(1) ColumnarToRow
    +- InMemoryTableScan [(id % 5)#120L, count#119L]
          +- InMemoryRelation [(id % 5)#120L, count#119L], StorageLevel(disk, memory, deserialized, 1 replicas)
                +- *(2) HashAggregate(keys=[(id#0L % 5)#8L], functions=[count(1)])
                   +- Exchange hashpartitioning((id#0L % 5)#8L, 200), true, [id=#13]
                      +- *(1) HashAggregate(keys=[(id#0L % 5) AS (id#0L % 5)#8L], functions=[partial_count(1)])
                         +- *(1) Range (0, 10, step=1, splits=16)
    
    scala> println(q1.queryExecution.optimizedPlan.numberedTreeString)
    00 InMemoryRelation [(id % 5)#5L, count#4L], StorageLevel(disk, memory, deserialized, 1 replicas)
    01    +- *(2) HashAggregate(keys=[(id#0L % 5)#8L], functions=[count(1)], output=[(id % 5)#5L, count#4L])
    02       +- Exchange hashpartitioning((id#0L % 5)#8L, 200), true, [id=#13]
    03          +- *(1) HashAggregate(keys=[(id#0L % 5) AS (id#0L % 5)#8L], functions=[partial_count(1)], output=[(id#0L % 5)#8L, count#10L])
    04             +- *(1) Range (0, 10, step=1, splits=16)
    

    Please note that the count above is a standard function not an action and no caching happens. It's just a coincidence that count is the name of a standard function and an Dataset action.

    You can cache a table using pure SQL (this is eager!)

    // That registers range5 to contain the output of range(5) function
    spark.sql("CACHE TABLE range5 AS SELECT * FROM range(5)")
    val q2 = spark.sql("SELECT * FROM range5")
    scala> q2.explain
    == Physical Plan ==
    *(1) ColumnarToRow
    +- Scan In-memory table `range5` [id#51L]
          +- InMemoryRelation [id#51L], StorageLevel(disk, memory, deserialized, 1 replicas)
                +- *(1) Range (0, 5, step=1, splits=16)
    

    InMemoryTableScan physical operator (with InMemoryRelation logical plan) is how you can make sure that the query is cached in-memory and hence reused.


    Moreover, Spark SQL itself uses the same pattern to trigger DataFrame caching for SQL's CACHE TABLE query (which, unlike RDD caching, is by default eager):

    if (!isLazy) {
      // Performs eager caching
      sparkSession.table(tableIdent).count()
    }
    

    That means that depending on the operators you may have different result as far as caching is concerned. cache and persist operators are lazy by default while SQL's CACHE TABLE is eager.