I am running Spark 3.5.3 with Python 3.12.3. I have created a dataframe like this:
df = spark.createDataFrame(data=[(1, "Tom"), (1, "Mitchell"), (2, "Alice"), (0, "Max"), (3, "Bob"), (3, "Amy"), (4, "Sydney")], schema="pos: int, name: string")
I repartition the dataframe into 5 partitions based on the pos column using new_df1 = df.repartition(5, "pos")
expecting each partition to have rows with a single pos value, but when I print the data by partitions, Partition 1 contains rows with different pos values as seen below
Partition: 0
Partition: 1
Row 0:Row(pos=0, name='Max')
Row 1:Row(pos=3, name='Bob')
Row 2:Row(pos=3, name='Amy')
Partition: 2
Row 3:Row(pos=4, name='Sydney')
Partition: 3
Row 4:Row(pos=1, name='Tom')
Row 5:Row(pos=1, name='Mitchell')
Partition: 4
Row 6:Row(pos=2, name='Alice')
I also checked the hash value and partition number for the pos column by referring the code here. Code below for reference.
from pyspark.sql import functions as sf
from pyspark.rdd import portable_hash
udf_portable_hash = sf.udf(lambda pos: portable_hash(pos))
df = df.withColumn("Hash#", udf_portable_hash(df.pos))
df = df.withColumn("Partition#", df["Hash#"] % 5)
and the result is
+---+--------+-----+----------+
|pos| name|Hash#|Partition#|
+---+--------+-----+----------+
| 1| Tom| 1| 1.0|
| 1|Mitchell| 1| 1.0|
| 2| Alice| 2| 2.0|
| 0| Max| 0| 0.0|
| 3| Bob| 3| 3.0|
| 3| Amy| 3| 3.0|
| 4| Sydney| 4| 4.0|
+---+--------+-----+----------+
Based on this result, Bob and Amy should be in a separate partition from Max, but that does not seem to the case when running repartition.
How can I enforce that rows with a certain pos value should be in its own partition?
Edit: I also tried repartitionByRange()
but it also did not solve the issue. Output shared below:
Partition 0:
Row 0:Row(pos=1, name='Tom')
Row 1:Row(pos=1, name='Mitchell')
Row 2:Row(pos=0, name='Max')
Partition 1:
Row 3:Row(pos=2, name='Alice')
Partition 2:
Row 4:Row(pos=3, name='Bob')
Row 5:Row(pos=3, name='Amy')
Partition 3:
Row 6:Row(pos=4, name='Sydney')
Partition 4:
As I mentioned in my comments, repartitionByRange
does not guarantee perfect distribution of keys across partitions (since Spark "guesses" the range via approximation). "Perfect" in a sense that there is a one-to-one relation between keys and partition numbers. However, the number of partitions still can be picked by trial and error such that each key gets its own partition. Here is the proof with df from your example :)
>>>
>>> spark.version
'3.3.2.3.3.7190.0-91'
>>> df = spark.createDataFrame(data=[(1, "Tom"), (1, "Mitchell"), (2, "Alice"), (0, "Max"), (3, "Bob"), (3, "Amy"), (4, "Sydney")], schema="pos: int, name: string")
>>> dfr = df.repartitionByRange(7,"pos")
>>> dfr.withColumn("pno",spark_partition_id()).orderBy("pno").show(truncate=False)
+---+--------+---+
|pos|name |pno|
+---+--------+---+
|0 |Max |0 |
|1 |Tom |1 |
|1 |Mitchell|1 |
|2 |Alice |2 |
|3 |Amy |3 |
|3 |Bob |3 |
|4 |Sydney |4 |
+---+--------+---+
>>>
As others mentioned in their comments, the issue sounds like XY problem though... Why would you need each partition to have exactly one key? Wouldn't knowing that all rows for a given key reside in one partition be sufficient?