apache-sparkpysparkapache-spark-sql

Spark: What is the difference between repartition and repartitionByRange?


I went through the documentation here: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html

It says:

And a previous question also mentions it. However, I still don't understand how exactly they differ and what the impact will be when choosing one over the other?

More importantly, if repartition does hash partitioning, what impact does providing columns as its argument have?


Solution

  • I think it is best to look into the difference with some experiments.

    Test Dataframes

    For this experiment, I am using the following two Dataframes (I am showing the code in Scala but the concept is identical to Python APIs):

    // Dataframe with one column "value" containing the values ranging from 0 to 1000000
    val df = Seq(0 to 1000000: _*).toDF("value")
    
    // Dataframe with one column "value" containing 1000000 the number 0 in addition to the numbers 5000, 10000 and 100000
    val df2 = Seq((0 to 1000000).map(_ => 0) :+ 5000 :+ 10000 :+ 100000: _*).toDF("value")
    

    Theory

    It is also worth mentioning that for both methods if numPartitions is not given, by default it partitions the Dataframe data into spark.sql.shuffle.partitions configured in your Spark session, and could be coalesced by Adaptive Query Execution (available since Spark 3.x).

    Test Setup

    Based on the given Testdata I am always applying the same code:

    val testDf = df
        // here I will insert the partition logic
        .withColumn("partition", spark_partition_id()) // applying SQL built-in function to determine actual partition
        .groupBy(col("partition"))
        .agg(
          count(col("value")).as("count"),
          min(col("value")).as("min_value"),
          max(col("value")).as("max_value"))
        .orderBy(col("partition"))
    
    testDf.show(false)
    

    Test Results

    df.repartition(4, col("value"))

    As expected, we get 4 partitions and because the values of df are ranging from 0 to 1000000 we see that their hashed values will result in a well distributed Dataframe.

    +---------+------+---------+---------+
    |partition|count |min_value|max_value|
    +---------+------+---------+---------+
    |0        |249911|12       |1000000  |
    |1        |250076|6        |999994   |
    |2        |250334|2        |999999   |
    |3        |249680|0        |999998   |
    +---------+------+---------+---------+
    

    df.repartitionByRange(4, col("value"))

    Also in this case, we get 4 partitions but this time the min and max values clearly shows the ranges of values within a partition. It is almost equally distributed with 250000 values per partition.

    +---------+------+---------+---------+
    |partition|count |min_value|max_value|
    +---------+------+---------+---------+
    |0        |244803|0        |244802   |
    |1        |255376|244803   |500178   |
    |2        |249777|500179   |749955   |
    |3        |250045|749956   |1000000  |
    +---------+------+---------+---------+
    

    df2.repartition(4, col("value"))

    Now, we are using the other Dataframe df2. Here, the hashing algorithm is hashing the values which are only 0, 5000, 10000 or 100000. Of course, the hash of the value 0 will always be the same, so all Zeros end up in the same partition (in this case partition 3). The other three partitions only contain one value.

    +---------+-------+---------+---------+
    |partition|count  |min_value|max_value|
    +---------+-------+---------+---------+
    |0        |1      |100000   |100000   |
    |1        |1      |10000    |10000    |
    |2        |1      |5000     |5000     |
    |3        |1000001|0        |0        |
    +---------+-------+---------+---------+
    

    df2.repartition(4)

    Without using the content of the column "value" the repartition method will distribute the messages on a RoundRobin basis. All partitions have almost the same amount of data.

    +---------+------+---------+---------+
    |partition|count |min_value|max_value|
    +---------+------+---------+---------+
    |0        |250002|0        |5000     |
    |1        |250002|0        |10000    |
    |2        |249998|0        |100000   |
    |3        |250002|0        |0        |
    +---------+------+---------+---------+
    

    df2.repartitionByRange(4, col("value"))

    This case shows that the Dataframe df2 is not well defined for a repartitioning by range as almost all values are 0. Therefore, we end up having only two partitions whereas the partition 0 contains all Zeros.

    +---------+-------+---------+---------+
    |partition|count  |min_value|max_value|
    +---------+-------+---------+---------+
    |0        |1000001|0        |0        |
    |1        |3      |5000     |100000   |
    +---------+-------+---------+---------+