apache-sparkpysparkparquetpartitioning

How to specify file size using repartition() in spark


Im using pyspark and I have a large data source that I want to repartition specifying the files size per partition explicitly.

I know using the repartition(500) function will split my parquet into 500 files with almost equal sizes. The problem is that new data gets added to this data source every day. On some days there might be a large input, and on some days there might be smaller inputs. So when looking at the partition file size distribution over a period of time, it varies between 200KB to 700KB per file.

I was thinking of specifying the max size per partition so that I get more or less the same file size per file per day irrespective of the number of files. This will help me when running my job on this large dataset later on to avoid skewed executor times and shuffle times etc.

Is there a way to specify it using the repartition() function or while writing the dataframe to parquet?


Solution

  • You could consider writing your result with the parameter maxRecordsPerFile.

    storage_location = //...
    estimated_records_with_desired_size = 2000
    result_df.write.option(
         "maxRecordsPerFile", 
         estimated_records_with_desired_size) \
         .parquet(storage_location, compression="snappy")
    

    And in order to determine what to use as the maxRecordsPerFile, you can look at the size of all the data/files in a partition (i.e. the size on disk/S3), and then divide by the number of records for that partition. It's a simple COUNT(*) with GROUP BY partition or WHERE partition = ....

    So let's say your data is partitioned per day, and on a given day you have 100,000 records in total, and the total file size in S3 is 100MB (e.g. two files of 50,000KB), well that's 1KB per record on average. So if you want your files to be 10MB, then put 10,000 records per file (maxRecordsPerFile = 10000).