I'm studying to take the data bricks to spark certification exam, and their practice exam ( please see > https://databricks-prod-cloudfront.cloud.databricks.com/public/793177bc53e528530b06c78a4fa0e086/0/6221173/100020/latest.html ) requires us to accept this statement as true fact:
"Explicit caching can decrease application performance by interfering with the Catalyst optimizer's ability to optimize some queries"
I got this question wrong even though I have read up a lot on the catalyst and have a pretty good grasp of the details. So I wanted to shore up my knowledge of this topic and go to the source which explains the how's and why's behind this assertion.
Can anyone provide guidance about this? Specifically, why is this so? and how do we ensure that when we cache our datasets we are not actually getting in the way of the optimizer and making things worse? /Thanks!
Let's use a simple example to demonstrate that :
// Some data
val df = spark.range(100)
df.join(df, Seq("id")).filter('id <20).explain(true)
Here, the catalyst plan will optimize this join by doing a filter on each dataframe before joining, to reduce the amount of data that will get shuffled.
== Optimized Logical Plan ==
Project [id#0L]
+- Join Inner, (id#0L = id#69L)
:- Filter (id#0L < 20)
: +- Range (0, 100, step=1, splits=Some(4))
+- Filter (id#69L < 20)
+- Range (0, 100, step=1, splits=Some(4))
If we cache the query after the join, the query won't be as optimized, as we can see here :
df.join(df, Seq("id")).cache.filter('id <20).explain(true)
== Optimized Logical Plan ==
Filter (id#0L < 20)
+- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#74L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=4)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 100, step=1, splits=4)
The filter is done at the very end ...
Why so ? Because a cache
writes on the disk the dataframe. So every consequent queries will use this cached / written on disk DataFrame, and so it will optimize only the part of the query AFTER the cache. We can check that with the same example !
df.join(df, Seq("id")).cache.join(df, Seq("id")).filter('id <20).explain(true)
== Optimized Logical Plan ==
Project [id#0L]
+- Join Inner, (id#0L = id#92L)
:- Filter (id#0L < 20)
: +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
: +- *Project [id#0L]
: +- *BroadcastHashJoin [id#0L], [id#74L], Inner, BuildRight
: :- *Range (0, 100, step=1, splits=4)
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
: +- *Range (0, 100, step=1, splits=4)
+- Filter (id#92L < 20)
+- Range (0, 100, step=1, splits=Some(4))
The filter is done before the second join, but after the first one because it is cached.
By knowing what you do ! You can simply compares catalyst plans and see what optimizations Spark is missing.