I am new to spark and trying to understand the internals of it. So, I am reading a small 50MB parquet file from s3 and performing a group by and then saving back to s3. When I observe the Spark UI, I can see 3 stages created for this,
stage 0 : load (1 tasks)
stage 1 : shufflequerystage for grouping (12 tasks)
stage 2: save (coalescedshufflereader) (26 tasks)
Code Sample:
df = spark.read.format("parquet").load(src_loc)
df_agg = df.groupby(grp_attribute)\
.agg(F.sum("no_of_launches").alias("no_of_launchesGroup")
df_agg.write.mode("overwrite").parquet(target_loc)
I am using EMR instance with 1 master, 3 core nodes(each with 4vcores). So, default parallelism is 12. I am not changing any config in runtime. But I am not able to understand why 26 tasks are created in the final stage? As I understand by default the shuffle partition should be 200. Screenshot of the UI attached.
I tried a similar logic on Databricks with Spark 2.4.5.
I observe that with spark.conf.set('spark.sql.adaptive.enabled', 'true')
, the final number of my partitions is 2.
I observe that with spark.conf.set('spark.sql.adaptive.enabled', 'false')
and spark.conf.set('spark.sql.shuffle.partitions', 75)
, the final number of my partitions is 75.
Using print(df_agg.rdd.getNumPartitions())
reveals this.
So, the job output on Spark UI does not reflect this. May be a repartition occurs at the end. Interesting, but not really an issue.