scalaapache-sparkrddpartitioner

How to properly apply HashPartitioner before a join in Spark?


To reduce shuffling during the joining of two RDDs, I decided to partition them using HashPartitioner first. Here is how I do it. Am I doing it correctly, or is there a better way to do this?

val rddA = ...
val rddB = ...

val numOfPartitions = rddA.getNumPartitions

val rddApartitioned = rddA.partitionBy(new HashPartitioner(numOfPartitions))
val rddBpartitioned = rddB.partitionBy(new HashPartitioner(numOfPartitions))

val rddAB = rddApartitioned.join(rddBpartitioned)

Solution

  • To reduce shuffling during the joining of two RDDs,

    It is surprisingly common misconception that repartitoning reduces or even eliminates shuffles. It doesn't. Repartitioning is shuffle in its purest form. It doesn't save time, bandwidth or memory.

    The rationale behind using proactive partitioner is different - it allows you to shuffle once, and reuse the state, to perform multiple by-key operations, without additional shuffles (though as far as I am aware, not necessarily without additional network traffic, as co-partitioning doesn't imply co-location, excluding cases where shuffles occurred in a single actions).

    So your code is correct, but in a case where you join once it doesn't buy you anything.