What is the shufflequerystage
box that I see in the spark DAGs. How is it different from the excahnge
box in the spark stages?
shufflequerystage are connected to AQE, they are being added after each stage with exchange and are used to materialized results after each stage and optimize remaining plan based on statistics.
So imo short answer is:
Exchange - here your data are shuffled
Shufflequerystage - added for AQE purposes to use runtime statistics and reoptimize plan
In below example i am trying to show this mechanism
Here is sample code:
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.adaptive.enabled", true)
val input = spark.read
.format("csv")
.option("header", "true")
.load(
"dbfs:/FileStore/shared_uploads/**@gmail.com/city_temperature.csv"
)
val dataForInput2 = Seq(
("Algeria", "3"),
("Germany", "3"),
("France", "5"),
("Poland", "7"),
("test55", "86")
)
val input2 = dataForInput2
.toDF("Country", "Value")
.withColumn("test", lit("test"))
val joinedDfs = input.join(input2, Seq("Country"))
val finalResult =
joinedDfs.filter(input("Country") === "Poland").repartition(200)
finalResult.show
I am reading data from file but you can replace it with small df created in code because i added line to disable broadcast. I added some withColumn and repartition to make it more interesting
First lets take a look at plan with AQE disabled:
== Physical Plan ==
CollectLimit (11)
+- Exchange (10)
+- * Project (9)
+- * SortMergeJoin Inner (8)
:- Sort (4)
: +- Exchange (3)
: +- * Filter (2)
: +- Scan csv (1)
+- Sort (7)
+- Exchange (6)
+- LocalTableScan (5)
Now AQE enabled
== Physical Plan ==
AdaptiveSparkPlan (25)
+- == Final Plan ==
CollectLimit (16)
+- ShuffleQueryStage (15), Statistics(sizeInBytes=1447.8 KiB, rowCount=9.27E+3, isRuntime=true)
+- Exchange (14)
+- * Project (13)
+- * SortMergeJoin Inner (12)
:- Sort (6)
: +- AQEShuffleRead (5)
: +- ShuffleQueryStage (4), Statistics(sizeInBytes=1158.3 KiB, rowCount=9.27E+3, isRuntime=true)
: +- Exchange (3)
: +- * Filter (2)
: +- Scan csv (1)
+- Sort (11)
+- AQEShuffleRead (10)
+- ShuffleQueryStage (9), Statistics(sizeInBytes=56.0 B, rowCount=1, isRuntime=true)
+- Exchange (8)
+- LocalTableScan (7)
The code is the same, the only difference is AQE but now you can see that ShuffleQueryStage popped up after each exchange
Lets take a look at Dag visualisation as in your example.
First lets take a look at job3 which included join
Then there is job4 which just reuse what was computed previously but adds additional 4th stage with ShuffleQueryStage similar as in your case