Suppose I have a partitioned parquet files p1 on column c. I have created dataframe on p1 and joining with some other dataframe after that on fiter condition using this column c. Will this help in execution time?
df = spark.read.parquet(p1 partitioned on c).createOrReplaceTempView('t1')
df2 = spark.read.parquet(someNonPartitionedFile).createOrReplaceTempView('t2')
spark.sql("select * from t1 join t2 on t1.keycolumn = t2.keycolumn where c = 'somevalue'")
What will be the order of execution? Will partitioning data reduce query time?
Partitioning the data helps reduce query time because Spark only needs to read the relevant partitions instead of scanning the entire dataset. As you mentioned joining a partitioned dataframe with some other dataframe dataframe on a filter condition using the partitioning column. it will help in reducing the execution time
I agree with @ Steven you can check the query plan in Spark UI usinng the result.explain(True)
result = spark.sql("SELECT * FROM t1 JOIN t2 ON t1.keycolumn = t2.keycolumn WHERE c = '2024-01-01'")
result.explain(True)
Results:
== Optimized Logical Plan ==
Join Inner, (keycolumn#790L = keycolumn#796L)
:- Filter ((isnotnull(c#792) AND isnotnull(keycolumn#790L)) AND (c#792 = 2024-01-01))
: +- Relation [keycolumn#790L,value#791,c#792] parquet
+- Filter isnotnull(keycolumn#796L)
+- Relation [keycolumn#796L,other_value#797] parquet
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
ColumnarToRow
+- PhotonResultStage
+- PhotonBroadcastHashJoin [keycolumn#790L], [keycolumn#796L], Inner, BuildLeft, false, true
:- PhotonShuffleExchangeSource
: +- PhotonShuffleMapStage
: +- PhotonShuffleExchangeSink SinglePartition
: +- PhotonScan parquet [keycolumn#790L,value#791,c#792] DataFilters: [isnotnull(keycolumn#790L)], DictionaryFilters: [], Format: parquet, Location: InMemoryFileIndex(1 paths)[dbfs:/tmp/partitioned_data], OptionalDataFilters: [], PartitionFilters: [isnotnull(c#792), (c#792 = 2024-01-01)], ReadSchema: struct<keycolumn:bigint,value:string>, RequiredDataFilters: [isnotnull(keycolumn#790L)]
+- PhotonScan parquet [keycolumn#796L,other_value#797] DataFilters: [isnotnull(keycolumn#796L)], DictionaryFilters: [], Format: parquet, Location: InMemoryFileIndex(1 paths)[dbfs:/tmp/non_partitioned_data], OptionalDataFilters: [hashedrelationcontains(keycolumn#796L)], PartitionFilters: [], ReadSchema: struct<keycolumn:bigint,other_value:string>, RequiredDataFilters: [isnotnull(keycolumn#796L)]