apache-sparkpyspark

Pyspark dataframe repartitioning puts all data in one partition


I have a dataframe with schema as follows:

root
 |-- category_id: string (nullable = true)
 |-- article_title: string (nullable = true)

And data that looks likes this:

+-----------+--------------------+
|category_id|     articletitle   |
+-----------+--------------------+
|       1000|HP EliteOne 800 G...|
|       1000|ASUSĀ  EB1501P ATM...|
|       1000|HP EliteOne 800 G...|
|          1|ASUS R557LA-XO119...|
|          1|HP EliteOne 800 G...|
+-----------+--------------------+

There are just two distinct category_id 1000 and 1.

I want to do a repartition by category_id and mapPartition on each of the partitions.

p_df = df.repartition(2, "category_id")
p_df.rdd.mapPartitionsWithIndex(some_func)

But data is not getting partitioned correctly, the expected result is that each mappartition will have data only for one category_id. But actual result is that one partition gets 0 records while the other gets all the records.

Why is this happening and how to fix this?

There is already a question on how spark partitioner works. My question is different as the answers contain only explanation on how the partitioner works, but my question is about why this happens (which is answered already) and how to fix it.


Solution

  • The reason on why repartition is putting all data in one partition is explained by @Ramesh Maharjan in the above answer. More on hash partitioning here

    I was able to make data go to different partitioner by using a custom partitioner. I made the rdd into a pairRdd format the (category_id, row) and used the partitionBy method giving in the number of partitions and custom_partitioner.

        categories = input_df.select("category_id").distinct().rdd.map(lambda r: r.category_id).collect()
        cat_idx = dict([(cat, idx) for idx, cat in enumerate(categories)])
    
        def category_partitioner(cid):
            return cat_idx[cid]