pysparkpalantir-foundryhive-partitionsfoundry-code-repositoriesfoundry-code-workbooks

In Foundry, how can I Hive partition with only 1 parquet file per value?


I'm looking to improve the performance on running filtering logic. To accomplish this, the idea is to do hive partitioning setting by setting the partition column to a column in the dataset (called splittable_column).

I checked and the cardinality of splittable column is low, and if I subset each value from splitting_column, the end result is a 800MB parquet file.

If the cardinality of my dataset is 3, my goal is to have the data laid out like:

spark/splittable_column=Value A/part-00000-abc.c000.snappy.parquet  
spark/splittable_column=Value B/part-00000-def.c000.snappy.parquet  
spark/splittable_column=Value C/part-00000-ghi.c000.snappy.parquet  

When I run my_output_df.write_dataframe(df_with_logic,partition_cols=["splittable_column"]), and look at the results, I see many files in the KB range within the directory, which is going to cause a large overhead during reading. For example my dataset looks like:

spark/splittable_column=Value A/part-00000-abc.c000.snappy.parquet
spark/splittable_column=Value A/part-00001-abc.c000.snappy.parquet  
spark/splittable_column=Value A/part-00002-abc.c000.snappy.parquet  
...
spark/splittable_column=Value A/part-00033-abc.c000.snappy.parquet  
spark/splittable_column=Value B/part-00000-def.c000.snappy.parquet
...
spark/splittable_column=Value B/part-00030-def.c000.snappy.parquet
spark/splittable_column=Value C/part-00000-ghi.c000.snappy.parquet
...
spark/splittable_column=Value C/part-00032-ghi.c000.snappy.parquet
etc.

From the documentation I understand that:

you will have at least one output file for each unique value in your partition column

How do I configure the transform that I get at most 1 output file per task during Hive partitioning?


Solution

  • If you look at the input data, you may notice that the data is split across multiple parquet files. When you look at the build report for just running my_output_df.write_dataframe(df_with_logic,partition_cols=["splittable_column"]), you may notice that there is no shuffle in the query plan.

    IE, you would see:

    Graph:

    Scan
    Project
    BasicStats
    Execute
    

    Plan:

    FoundrySaveDatasetCommand `ri.foundry.main.transaction.xxx@master`.`ri.foundry.main.dataset.yyy`, ErrorIfExists, [column1 ... 17 more fields],
    +- BasicStatsNode `ri.foundry.main.transaction.zzz@master`.`ri.foundry.main.dataset.aaa`
       +- Project [splitable_column ... 17 more fields]
          +- Relation !ri.foundry.main.transaction.xxx:master.ri.foundry.main.dataset.yyy[splittable_column... 17 more fields] parquet
    

    In this example, it only took 1 minute to run because there was no shuffle.

    Now if you repartition on the column you are going to partition by:

    df_with_logic = df_with_logic.repartition("splittable_column")
    my_output_df.write_dataframe(df_with_logic,partition_cols=["splittable_column"]
    

    It will force an Exchange, ie RepartitionByExpression on splittable_column, which will take longer (15 minutes in my case) but the data will be split the way I wanted:

    spark/splittable_column=Value A/part-00000-abc.c000.snappy.parquet  
    spark/splittable_column=Value B/part-00000-def.c000.snappy.parquet  
    spark/splittable_column=Value C/part-00000-ghi.c000.snappy.parquet  
    

    Graph:

    Scan
    Exchange
    Project
    BasicStats
    Execute
    

    Plan:

    ri.foundry.main.transaction.xxx@master`.`ri.foundry.main.dataset.yyy`, ErrorIfExists, [column1 ... 17 more fields],
    +- BasicStatsNode `ri.foundry.main.transaction.zzz@master`.`ri.foundry.main.dataset.aaa`
       +- Project [splitable_column ... 17 more fields]
          +- RepartitionByExpression [splittable_column], 1
              +- Relation !ri.foundry.main.transaction.xxx:master.ri.foundry.main.dataset.yyy[splittable_column... 17 more fields] parquet