I'm somewhat new to PySpark. I understand that it uses lazy evaluation, meaning that execution of a series of transformations will be deferred until some action is requested, at which point the Spark engine optimizes the entire set of transformations and then executes them.
Therefore, from both functional and performance standpoint I would expect this:
Approach A
df = spark.read.parquet(path)
df = df.filter(F.col('state') == 'CA')
df = df.select('id', 'name', 'subject')
df = df.groupBy('subject').count()
to be the same as this:
Approach B
df = spark.read.parquet(path)\
.filter(F.col('state') == 'CA')\
.select('id', 'name', 'subject')\
.groupBy('subject').count()
I like the style of Approach A because it would allow me to break up my logic into smaller (and re-usable) functions.
However, I have come across a few blog posts (e.g. here, here and here) that are confusing to me. These posts deal specifically with the use of successive withColumn
statements and suggest using a single select
instead. The underlying rationale seems to be that since dataframes are immutable, successive withColumn
usage is detrimental to performance.
So my question is... would I have that same performance issue when using Approach A? Or is it just an issue specific to withColumn
?
There are 2 questions in fact.
First question with codes samples. Unlike the 1st answer I disagree.
Try both code segments with an .explain()
and you will see the generated Physical Plan for Execution is exactly the same.
Spark is based on lazy evaluation
. That is to say:
All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently. For example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.
The upshot of all this is, that I ran similar code to yours with 2 filters applied, and note that as the Action .count
causes just-in-time evaluation, Catalyst filtered out based on both the first and the 2nd filter. This is known as "Code Fusing"
- which can be done to late execution aka Lazy Evaluation.
Snippet 1 and Physical Plan
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.functions import col
data = [("James","","Smith","36636","M",3000),
("Michael","Rose","","40288","M",4000),
("Robert","","Williams","42114","M",4000),
("Maria","Anne","Jones","39192","F",4000),
("Jen","Mary","Brown","","F",-1)
]
schema = StructType([ \
StructField("firstname",StringType(),True), \
StructField("middlename",StringType(),True), \
StructField("lastname",StringType(),True), \
StructField("id", StringType(), True), \
StructField("gender", StringType(), True), \
StructField("salary", IntegerType(), True) \
])
df = spark.createDataFrame(data=data,schema=schema)
df = df.filter(col('lastname') == 'Jones')
df = df.select('firstname', 'lastname', 'salary')
df = df.filter(col('lastname') == 'Jones2')
df = df.groupBy('lastname').count().explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[lastname#212], functions=[finalmerge_count(merge count#233L) AS count(1)#228L])
+- Exchange hashpartitioning(lastname#212, 200), ENSURE_REQUIREMENTS, [plan_id=391]
+- HashAggregate(keys=[lastname#212], functions=[partial_count(1) AS count#233L])
+- Project [lastname#212]
+- Filter (isnotnull(lastname#212) AND ((lastname#212 = Jones) AND (lastname#212 = Jones2)))
+- Scan ExistingRDD[firstname#210,middlename#211,lastname#212,id#213,gender#214,salary#215]
Snippet 2 and Same Physical Plan
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.functions import col
data2 = [("James","","Smith","36636","M",3000),
("Michael","Rose","","40288","M",4000),
("Robert","","Williams","42114","M",4000),
("Maria","Anne","Jones","39192","F",4000),
("Jen","Mary","Brown","","F",-1)
]
schema2 = StructType([ \
StructField("firstname",StringType(),True), \
StructField("middlename",StringType(),True), \
StructField("lastname",StringType(),True), \
StructField("id", StringType(), True), \
StructField("gender", StringType(), True), \
StructField("salary", IntegerType(), True) \
])
df2 = spark.createDataFrame(data=data2,schema=schema2)
df2 = df2.filter(col('lastname') == 'Jones')\
.select('firstname', 'lastname', 'salary')\
.filter(col('lastname') == 'Jones2')\
.groupBy('lastname').count().explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[lastname#299], functions=[finalmerge_count(merge count#320L) AS count(1)#315L])
+- Exchange hashpartitioning(lastname#299, 200), ENSURE_REQUIREMENTS, [plan_id=577]
+- HashAggregate(keys=[lastname#299], functions=[partial_count(1) AS count#320L])
+- Project [lastname#299]
+- Filter (isnotnull(lastname#299) AND ((lastname#299 = Jones) AND (lastname#299 = Jones2)))
+- Scan ExistingRDD[firstname#297,middlename#298,lastname#299,id#300,gender#301,salary#302]
Second Question - withColumn
Doing this:
df = df.filter(col('lastname') == 'Jones')
df = df.select('firstname', 'lastname', 'salary')
df = df.withColumn("salary100",col("salary")*100)
df = df.withColumn("salary200",col("salary")*200).explain()
or via chaining gives the same result as well. I.e. it does not matter how you write the transformations. The final Physical Plan is what counts, but that optimization has overhead though. Depends how you think on this, select
is the alternative.
== Physical Plan ==
*(1) Project [firstname#399, lastname#401, salary#404, (salary#404 * 100) AS salary100#414, (salary#404 * 200) AS salary200#419]
+- *(1) Filter (isnotnull(lastname#401) AND (lastname#401 = Jones))
+- *(1) Scan ExistingRDD[firstname#399,middlename#400,lastname#401,id#402,gender#403,salary#404]
This article https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015#:~:text=Summary,number%20of%20transforms%20on%20DataFrame%20 takes a different perspective on the cost of optimization as opposed to 'projection' issue. I am, was surprized that 3 withColumns would cause any issue. If it is an issue then it points to crap optimization. And I suspect that looping of many features is the real issue, not a small number of withColumns.