I want to Hive-partition my dataset, but I don't quite know how to ensure the file counts in the splits are sane. I know I should roughly aim for files that are 128MB in size
How do I safely scale and control the row counts inside files of my Hive-partitioned dataset?
For this answer, I'll assume you have correctly understood the reasons why you should and should not do Hive-style partitioning and won't be covering the backing theory.
In this case, it's important to ensure we not only correctly calculate the number of files needed inside our splits but also repartition our dataset based on these calculations. Failure to do repartitions before write-out on Hive-style partition datasets may result in your job attempting to write out millions of tiny files which will kill your performance.
In our case, the strategy we will use will be to create files that are at most N
rows per file, which will bound the size of each file. We can't easily limit the exact size of each file inside the splits, but we can use row counts as a good approximation.
The methodology we will use to accomplish this will be to create a synthetic column that describes which 'batch' a row will belong to, repartition the final dataset on both the Hive split column and this synthetic column, and use this result on write.
In order to ensure our synthetic column indicates the proper batch a row belongs to, we need to determine the number of rows inside each hive split, and 'sprinkle' the rows inside this split into the proper number of files.
The strategy in total will look something like this:
Let's start by considering the following dataset:
from pyspark.sql import types as T, functions as F, window, SparkSession
spark = SparkSession.builder.getOrCreate()
# Synthesize DataFrames
schema = T.StructType([
T.StructField("col_1", T.StringType(), False),
T.StructField("col_2", T.IntegerType(), False),
T.StructField("col_3", T.StringType(), False),
T.StructField("col_4", T.IntegerType(), False),
])
data = [
{"col_1": "key_1", "col_2": 1, "col_3": "CREATE", "col_4": 0},
{"col_1": "key_2", "col_2": 2, "col_3": "CREATE", "col_4": 0},
{"col_1": "key_3", "col_2": 3, "col_3": "CREATE", "col_4": 0},
]
final_data = []
# Scale up a bit
for _ in range(10):
final_data += data
df = spark.createDataFrame(final_data, schema)
df.show()
Let's imagine the column we want to Hive split on is col_1
, and we want 5 rows per file per value of col_1
.
ROWS_PER_FILE = 5
# Per value in our Hive split, how many rows are there?
split_counts = df.groupBy("col_1").agg(F.count("col_1").alias("rows_in_this_split"))
# Add these counts to the main df
df_with_counts = df.join(split_counts, on="col_1")
df_with_index = df_with_counts.withColumn( # Determine the number of files...
"num_files_unrounded",
F.col("rows_in_this_split") / F.lit(ROWS_PER_FILE)
).withColumn( # Make this number of files the nearest int...
"num_files",
F.round(
F.ceil(
F.col("num_files_unrounded")
),
0
).cast("int")
).withColumn(
"file_index", # Pick a random value between 0 and the number of files....
F.rand() * F.col("num_files")
).withColumn(
"index", # Round to the nearest int
F.round(
F.floor(
F.col("file_index")
),
0
).cast("int")
)
df_with_index.show()
"""
+-----+-----+------+-----+------------------+-------------------+---------+-------------------+-----+
|col_1|col_2| col_3|col_4|rows_in_this_split|num_files_unrounded|num_files| file_index|index|
+-----+-----+------+-----+------------------+-------------------+---------+-------------------+-----+
|key_1| 1|CREATE| 0| 10| 2.0| 2| 0.92294281966342| 0|
|key_1| 1|CREATE| 0| 10| 2.0| 2| 0.7701823230466494| 0|
|key_1| 1|CREATE| 0| 10| 2.0| 2| 0.7027155114438342| 0|
|key_1| 1|CREATE| 0| 10| 2.0| 2| 0.2386678474259014| 0|
|key_1| 1|CREATE| 0| 10| 2.0| 2| 0.983665114675822| 0|
|key_1| 1|CREATE| 0| 10| 2.0| 2| 0.9674556368778833| 0|
|key_1| 1|CREATE| 0| 10| 2.0| 2| 1.0727574871222592| 1|
|key_1| 1|CREATE| 0| 10| 2.0| 2|0.07142743481376246| 0|
|key_1| 1|CREATE| 0| 10| 2.0| 2| 1.0401870580895767| 1|
|key_1| 1|CREATE| 0| 10| 2.0| 2| 1.0915212267807561| 1|
|key_2| 2|CREATE| 0| 10| 2.0| 2| 0.5097131383965849| 0|
|key_2| 2|CREATE| 0| 10| 2.0| 2| 0.1837310991545238| 0|
|key_2| 2|CREATE| 0| 10| 2.0| 2| 0.3142077066468343| 0|
|key_2| 2|CREATE| 0| 10| 2.0| 2| 1.330191792519476| 1|
|key_2| 2|CREATE| 0| 10| 2.0| 2| 1.5802012613480614| 1|
|key_2| 2|CREATE| 0| 10| 2.0| 2| 1.1701764577368479| 1|
|key_2| 2|CREATE| 0| 10| 2.0| 2| 0.9786522146923651| 0|
|key_2| 2|CREATE| 0| 10| 2.0| 2| 0.5304094894753706| 0|
|key_2| 2|CREATE| 0| 10| 2.0| 2| 1.2317743611604448| 1|
|key_2| 2|CREATE| 0| 10| 2.0| 2| 1.867430955808408| 1|
+-----+-----+------+-----+------------------+-------------------+---------+-------------------+-----+
"""
Now that we know what file index each row belongs in, we now need to repartition before write out.
split_counts = df_with_index.groupBy("col_1", "index").agg(F.count("*").alias("row_count")).orderBy("col_1", "index") # Show the counts per unique combination of hive split column and file index
split_counts.show()
"""
+-----+-----+---------+
|col_1|index|row_count|
+-----+-----+---------+
|key_1| 0| 7|
|key_1| 1| 3|
|key_2| 0| 5|
|key_2| 1| 5|
|key_3| 0| 5|
|key_3| 1| 5|
+-----+-----+---------+
"""
number_distinct_splits = split_counts.count() # This number of unique combinations is what we will repartition into
final_write_df = df_with_index.repartition(number_distinct_splits, ["col_1", "index"])
Now, on write-out, make sure to ensure your write options includes partition_cols=["col_1"]
, and voila!
I'd highly recommend reading this post as well to ensure you understand exactly why the partitioning is necessary before write-out