I'm looking to improve the performance on running filtering logic. To accomplish this, the idea is to do hive partitioning setting by setting the partition column to a column in the dataset (called splittable_column
).
I checked and the cardinality of splittable column is low, and if I subset each value from splitting_column
, the end result is a 800MB parquet file.
If the cardinality of my dataset is 3, my goal is to have the data laid out like:
spark/splittable_column=Value A/part-00000-abc.c000.snappy.parquet
spark/splittable_column=Value B/part-00000-def.c000.snappy.parquet
spark/splittable_column=Value C/part-00000-ghi.c000.snappy.parquet
When I run my_output_df.write_dataframe(df_with_logic,partition_cols=["splittable_column"])
, and look at the results, I see many files in the KB range within the directory, which is going to cause a large overhead during reading. For example my dataset looks like:
spark/splittable_column=Value A/part-00000-abc.c000.snappy.parquet
spark/splittable_column=Value A/part-00001-abc.c000.snappy.parquet
spark/splittable_column=Value A/part-00002-abc.c000.snappy.parquet
...
spark/splittable_column=Value A/part-00033-abc.c000.snappy.parquet
spark/splittable_column=Value B/part-00000-def.c000.snappy.parquet
...
spark/splittable_column=Value B/part-00030-def.c000.snappy.parquet
spark/splittable_column=Value C/part-00000-ghi.c000.snappy.parquet
...
spark/splittable_column=Value C/part-00032-ghi.c000.snappy.parquet
etc.
From the documentation I understand that:
you will have at least one output file for each unique value in your partition column
How do I configure the transform that I get at most 1 output file per task during Hive partitioning?
If you look at the input data, you may notice that the data is split across multiple parquet files. When you look at the build report for just running my_output_df.write_dataframe(df_with_logic,partition_cols=["splittable_column"])
, you may notice that there is no shuffle in the query plan.
IE, you would see:
Graph:
Scan
Project
BasicStats
Execute
Plan:
FoundrySaveDatasetCommand `ri.foundry.main.transaction.xxx@master`.`ri.foundry.main.dataset.yyy`, ErrorIfExists, [column1 ... 17 more fields],
+- BasicStatsNode `ri.foundry.main.transaction.zzz@master`.`ri.foundry.main.dataset.aaa`
+- Project [splitable_column ... 17 more fields]
+- Relation !ri.foundry.main.transaction.xxx:master.ri.foundry.main.dataset.yyy[splittable_column... 17 more fields] parquet
In this example, it only took 1 minute to run because there was no shuffle.
Now if you repartition on the column you are going to partition by:
df_with_logic = df_with_logic.repartition("splittable_column")
my_output_df.write_dataframe(df_with_logic,partition_cols=["splittable_column"]
It will force an Exchange
, ie RepartitionByExpression
on splittable_column
, which will take longer (15 minutes in my case) but the data will be split the way I wanted:
spark/splittable_column=Value A/part-00000-abc.c000.snappy.parquet
spark/splittable_column=Value B/part-00000-def.c000.snappy.parquet
spark/splittable_column=Value C/part-00000-ghi.c000.snappy.parquet
Graph:
Scan
Exchange
Project
BasicStats
Execute
Plan:
ri.foundry.main.transaction.xxx@master`.`ri.foundry.main.dataset.yyy`, ErrorIfExists, [column1 ... 17 more fields],
+- BasicStatsNode `ri.foundry.main.transaction.zzz@master`.`ri.foundry.main.dataset.aaa`
+- Project [splitable_column ... 17 more fields]
+- RepartitionByExpression [splittable_column], 1
+- Relation !ri.foundry.main.transaction.xxx:master.ri.foundry.main.dataset.yyy[splittable_column... 17 more fields] parquet