azurepysparkapache-spark-sqlazure-databricks

What is the order of execution of queries if it has multiple joins in SparkSQL?


Suppose I have a partitioned parquet files p1 on column c. I have created dataframe on p1 and joining with some other dataframe after that on fiter condition using this column c. Will this help in execution time?

df = spark.read.parquet(p1 partitioned on c).createOrReplaceTempView('t1')
df2 = spark.read.parquet(someNonPartitionedFile).createOrReplaceTempView('t2')

spark.sql("select * from t1 join t2 on t1.keycolumn = t2.keycolumn where c = 'somevalue'")

What will be the order of execution? Will partitioning data reduce query time?


Solution

  • Partitioning the data helps reduce query time because Spark only needs to read the relevant partitions instead of scanning the entire dataset. As you mentioned joining a partitioned dataframe with some other dataframe dataframe on a filter condition using the partitioning column. it will help in reducing the execution time

    I agree with @ Steven you can check the query plan in Spark UI usinng the result.explain(True)

    result = spark.sql("SELECT * FROM t1 JOIN t2 ON t1.keycolumn = t2.keycolumn WHERE c = '2024-01-01'")
    result.explain(True)
    

    Results:

    == Optimized Logical Plan ==
    Join Inner, (keycolumn#790L = keycolumn#796L)
    :- Filter ((isnotnull(c#792) AND isnotnull(keycolumn#790L)) AND (c#792 = 2024-01-01))
    :  +- Relation [keycolumn#790L,value#791,c#792] parquet
    +- Filter isnotnull(keycolumn#796L)
       +- Relation [keycolumn#796L,other_value#797] parquet
    
    == Physical Plan ==
    AdaptiveSparkPlan isFinalPlan=false
    +- == Initial Plan ==
       ColumnarToRow
       +- PhotonResultStage
          +- PhotonBroadcastHashJoin [keycolumn#790L], [keycolumn#796L], Inner, BuildLeft, false, true
             :- PhotonShuffleExchangeSource
             :  +- PhotonShuffleMapStage
             :     +- PhotonShuffleExchangeSink SinglePartition
             :        +- PhotonScan parquet [keycolumn#790L,value#791,c#792] DataFilters: [isnotnull(keycolumn#790L)], DictionaryFilters: [], Format: parquet, Location: InMemoryFileIndex(1 paths)[dbfs:/tmp/partitioned_data], OptionalDataFilters: [], PartitionFilters: [isnotnull(c#792), (c#792 = 2024-01-01)], ReadSchema: struct<keycolumn:bigint,value:string>, RequiredDataFilters: [isnotnull(keycolumn#790L)]
             +- PhotonScan parquet [keycolumn#796L,other_value#797] DataFilters: [isnotnull(keycolumn#796L)], DictionaryFilters: [], Format: parquet, Location: InMemoryFileIndex(1 paths)[dbfs:/tmp/non_partitioned_data], OptionalDataFilters: [hashedrelationcontains(keycolumn#796L)], PartitionFilters: [], ReadSchema: struct<keycolumn:bigint,other_value:string>, RequiredDataFilters: [isnotnull(keycolumn#796L)]