sqlpysparkdatabricksaws-databricks

Keep partition number reasonable, but partition dataframe such that values of a high cardinality column are in same partition


Tagging "sql" too because an answer that derives a column to partition on with sparkSql would be fine.

Summary: Say I have 3B distinct values of AlmostUID. I don't want 3B partitions, say I want 1000 partitions. But I want all like values of AlmostUID to be on the same partition.

Input:

AlmostUID LoadMonth
1 April
1 May
2 April
3 June
4 June
4 August
5 September

Expected:

"GoodPartition" is good because the records with AlmostUID (1) are on the same partition. Records with AlmostUID (4) are on the same partition.

"BadPartition" is bad because AlmostUID (1) is mapped to multiple different partitions.

AlmostUID LoadMonth GoodPartition BadPartition
1 April 1 1
1 May 1 2
2 April 1 1
3 June 2 1
4 June 2 2
4 August 2 2
5 September 2 2

Solution

  • Use bucketing on the column with high cardinality.

    df.write.format('parquet') \
      .mode('overwrite') \
      .bucketBy(1000, 'AlmostUID') \
      .save('path_to_save_data')  
    

    The above snippet will do a bucket by operation and uses a hash-based algorithm to determine the bucket assignment for each row based on the specified column. This algorithm involves applying a hash function to the values in the bucketing column and then mapping the hashed values to a specific bucket.

    The hash-based algorithm is deterministic, meaning that given the same input values, it will always produce the same bucket assignment. This property ensures that the data is consistently bucketed across multiple runs or nodes in a distributed environment.

    When performing bucketing in PySpark, the hash algorithm evenly distributes the rows across the specified number of buckets. The goal is to achieve a roughly equal distribution of data across the buckets, helping with data skew and improving query performance for operations like joins and aggregations.

    It's important to note that the hash-based bucketing algorithm in PySpark assumes a uniform distribution of data in the bucketing column. If the data is not uniformly distributed, it may result in uneven bucket sizes and potential performance issues. In such cases, you might need to preprocess the data to ensure a more balanced distribution before applying bucketing.