apache-sparkpysparkapache-spark-sqlspark-ui

What is shufflequerystage in spark DAG?


What is the shufflequerystage box that I see in the spark DAGs. How is it different from the excahnge box in the spark stages?

enter image description here


Solution

  • shufflequerystage are connected to AQE, they are being added after each stage with exchange and are used to materialized results after each stage and optimize remaining plan based on statistics.

    So imo short answer is:

    Exchange - here your data are shuffled

    Shufflequerystage - added for AQE purposes to use runtime statistics and reoptimize plan

    In below example i am trying to show this mechanism

    Here is sample code:

    import org.apache.spark.sql.functions._
    
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
    spark.conf.set("spark.sql.adaptive.enabled", true)
    
    val input = spark.read
      .format("csv")
      .option("header", "true")
      .load(
        "dbfs:/FileStore/shared_uploads/**@gmail.com/city_temperature.csv"
      )
    val dataForInput2 = Seq(
      ("Algeria", "3"),
      ("Germany", "3"),
      ("France", "5"),
      ("Poland", "7"),
      ("test55", "86")
    )
    val input2 = dataForInput2
      .toDF("Country", "Value")
      .withColumn("test", lit("test"))
    val joinedDfs = input.join(input2, Seq("Country"))
    val finalResult =
      joinedDfs.filter(input("Country") === "Poland").repartition(200)
    finalResult.show 
    

    I am reading data from file but you can replace it with small df created in code because i added line to disable broadcast. I added some withColumn and repartition to make it more interesting

    First lets take a look at plan with AQE disabled:

    == Physical Plan ==
    CollectLimit (11)
    +- Exchange (10)
       +- * Project (9)
          +- * SortMergeJoin Inner (8)
             :- Sort (4)
             :  +- Exchange (3)
             :     +- * Filter (2)
             :        +- Scan csv  (1)
             +- Sort (7)
                +- Exchange (6)
                   +- LocalTableScan (5)
    

    Now AQE enabled

    == Physical Plan ==
    AdaptiveSparkPlan (25)
    +- == Final Plan ==
       CollectLimit (16)
       +- ShuffleQueryStage (15), Statistics(sizeInBytes=1447.8 KiB, rowCount=9.27E+3, isRuntime=true)
          +- Exchange (14)
             +- * Project (13)
                +- * SortMergeJoin Inner (12)
                   :- Sort (6)
                   :  +- AQEShuffleRead (5)
                   :     +- ShuffleQueryStage (4), Statistics(sizeInBytes=1158.3 KiB, rowCount=9.27E+3, isRuntime=true)
                   :        +- Exchange (3)
                   :           +- * Filter (2)
                   :              +- Scan csv  (1)
                   +- Sort (11)
                      +- AQEShuffleRead (10)
                         +- ShuffleQueryStage (9), Statistics(sizeInBytes=56.0 B, rowCount=1, isRuntime=true)
                            +- Exchange (8)
                               +- LocalTableScan (7)
    

    The code is the same, the only difference is AQE but now you can see that ShuffleQueryStage popped up after each exchange

    Lets take a look at Dag visualisation as in your example.

    First lets take a look at job3 which included join

    enter image description here

    Then there is job4 which just reuse what was computed previously but adds additional 4th stage with ShuffleQueryStage similar as in your case

    enter image description here