pysparkpalantir-foundryfoundry-code-repositoriesfoundry-python-transform

How do I control the file counts inside my Hive-partitioned dataset?


I want to Hive-partition my dataset, but I don't quite know how to ensure the file counts in the splits are sane. I know I should roughly aim for files that are 128MB in size

How do I safely scale and control the row counts inside files of my Hive-partitioned dataset?


Solution

  • For this answer, I'll assume you have correctly understood the reasons why you should and should not do Hive-style partitioning and won't be covering the backing theory.

    In this case, it's important to ensure we not only correctly calculate the number of files needed inside our splits but also repartition our dataset based on these calculations. Failure to do repartitions before write-out on Hive-style partition datasets may result in your job attempting to write out millions of tiny files which will kill your performance.

    In our case, the strategy we will use will be to create files that are at most N rows per file, which will bound the size of each file. We can't easily limit the exact size of each file inside the splits, but we can use row counts as a good approximation.

    The methodology we will use to accomplish this will be to create a synthetic column that describes which 'batch' a row will belong to, repartition the final dataset on both the Hive split column and this synthetic column, and use this result on write.

    In order to ensure our synthetic column indicates the proper batch a row belongs to, we need to determine the number of rows inside each hive split, and 'sprinkle' the rows inside this split into the proper number of files.

    The strategy in total will look something like this:

    1. Determine number of rows per Hive value
    2. Join this count against main dataframe
    3. Determine number of files in split by dividing row count per split by rows per file
    4. Create random index between 0 and the file count, essentially 'picking' the file the row will belong to
    5. Calculate number of unique combinations of Hive split columns and our synthetic column
    6. Repartition output dataset over both Hive column and synthetic column into the number of unique combinations. i.e. one file per combination, exactly what we want

    Let's start by considering the following dataset:

    from pyspark.sql import types as T, functions as F, window, SparkSession
    
    spark = SparkSession.builder.getOrCreate()
    
    
    # Synthesize DataFrames
    schema = T.StructType([
      T.StructField("col_1", T.StringType(), False),
      T.StructField("col_2", T.IntegerType(), False),
      T.StructField("col_3", T.StringType(), False),
      T.StructField("col_4", T.IntegerType(), False),
    ])
    data = [
      {"col_1": "key_1", "col_2": 1, "col_3": "CREATE", "col_4": 0},
      {"col_1": "key_2", "col_2": 2, "col_3": "CREATE", "col_4": 0},
      {"col_1": "key_3", "col_2": 3, "col_3": "CREATE", "col_4": 0},
    ]
    
    final_data = []
    # Scale up a bit
    for _ in range(10):
        final_data += data
    
    df = spark.createDataFrame(final_data, schema)
    df.show()
    

    Let's imagine the column we want to Hive split on is col_1, and we want 5 rows per file per value of col_1.

    ROWS_PER_FILE = 5
    
    # Per value in our Hive split, how many rows are there?
    split_counts = df.groupBy("col_1").agg(F.count("col_1").alias("rows_in_this_split"))
    
    # Add these counts to the main df
    df_with_counts = df.join(split_counts, on="col_1")
    
    
    df_with_index = df_with_counts.withColumn(  # Determine the number of files...
        "num_files_unrounded",
        F.col("rows_in_this_split") / F.lit(ROWS_PER_FILE)
    ).withColumn(                               # Make this number of files the nearest int...
        "num_files",
        F.round(
            F.ceil(
                F.col("num_files_unrounded")
            ),
            0
        ).cast("int")
    ).withColumn(
        "file_index",                           # Pick a random value between 0 and the number of files....
        F.rand() * F.col("num_files")
    ).withColumn(
        "index",                                # Round to the nearest int
        F.round(
            F.floor(
                F.col("file_index")
            ),
            0
        ).cast("int")
    )
    
    df_with_index.show()
    """
    +-----+-----+------+-----+------------------+-------------------+---------+-------------------+-----+
    |col_1|col_2| col_3|col_4|rows_in_this_split|num_files_unrounded|num_files|         file_index|index|
    +-----+-----+------+-----+------------------+-------------------+---------+-------------------+-----+
    |key_1|    1|CREATE|    0|                10|                2.0|        2|   0.92294281966342|    0|
    |key_1|    1|CREATE|    0|                10|                2.0|        2| 0.7701823230466494|    0|
    |key_1|    1|CREATE|    0|                10|                2.0|        2| 0.7027155114438342|    0|
    |key_1|    1|CREATE|    0|                10|                2.0|        2| 0.2386678474259014|    0|
    |key_1|    1|CREATE|    0|                10|                2.0|        2|  0.983665114675822|    0|
    |key_1|    1|CREATE|    0|                10|                2.0|        2| 0.9674556368778833|    0|
    |key_1|    1|CREATE|    0|                10|                2.0|        2| 1.0727574871222592|    1|
    |key_1|    1|CREATE|    0|                10|                2.0|        2|0.07142743481376246|    0|
    |key_1|    1|CREATE|    0|                10|                2.0|        2| 1.0401870580895767|    1|
    |key_1|    1|CREATE|    0|                10|                2.0|        2| 1.0915212267807561|    1|
    |key_2|    2|CREATE|    0|                10|                2.0|        2| 0.5097131383965849|    0|
    |key_2|    2|CREATE|    0|                10|                2.0|        2| 0.1837310991545238|    0|
    |key_2|    2|CREATE|    0|                10|                2.0|        2| 0.3142077066468343|    0|
    |key_2|    2|CREATE|    0|                10|                2.0|        2|  1.330191792519476|    1|
    |key_2|    2|CREATE|    0|                10|                2.0|        2| 1.5802012613480614|    1|
    |key_2|    2|CREATE|    0|                10|                2.0|        2| 1.1701764577368479|    1|
    |key_2|    2|CREATE|    0|                10|                2.0|        2| 0.9786522146923651|    0|
    |key_2|    2|CREATE|    0|                10|                2.0|        2| 0.5304094894753706|    0|
    |key_2|    2|CREATE|    0|                10|                2.0|        2| 1.2317743611604448|    1|
    |key_2|    2|CREATE|    0|                10|                2.0|        2|  1.867430955808408|    1|
    +-----+-----+------+-----+------------------+-------------------+---------+-------------------+-----+
    """
    

    Now that we know what file index each row belongs in, we now need to repartition before write out.

    split_counts = df_with_index.groupBy("col_1", "index").agg(F.count("*").alias("row_count")).orderBy("col_1", "index")  # Show the counts per unique combination of hive split column and file index
    split_counts.show()
    """
    +-----+-----+---------+
    |col_1|index|row_count|
    +-----+-----+---------+
    |key_1|    0|        7|
    |key_1|    1|        3|
    |key_2|    0|        5|
    |key_2|    1|        5|
    |key_3|    0|        5|
    |key_3|    1|        5|
    +-----+-----+---------+
    """
    number_distinct_splits = split_counts.count()    # This number of unique combinations is what we will repartition into
    
    final_write_df = df_with_index.repartition(number_distinct_splits, ["col_1", "index"])
    

    Now, on write-out, make sure to ensure your write options includes partition_cols=["col_1"], and voila!

    I'd highly recommend reading this post as well to ensure you understand exactly why the partitioning is necessary before write-out