scalaapache-sparkapache-spark-sqlpartitionpartitioner

how is "Exchange hashpartitioning" working in spark


I have a dataset which I want to write sorted into parquet files for getting benefit of requesting these files afterwards over Spark including Predicate Pushdown.

Currently I used repartition by column and the number of partitions to move the data to the particular partition. The column is identifying the corresponding partition (beginning from 0 to (fixed) n). The result is that scala/spark is generating an unexpected result and creating less partitions (some of them are empty). Maybe a Hash Collision?

For solving the problem I tried to find out the reason and tried to find workarounds. I found one workaround by transforming the dataframe to rdd and use partitionBy with HashPartitioner. Surprising for me: I got the expected results. But converting a dataframe to an RDD is not a solution for me, because it takes too much resources.

I have tested this environment on

Here is my tests with outputs. Please use Spark-shell to run them

    scala> import org.apache.spark.HashPartitioner
    import org.apache.spark.HashPartitioner

    scala> val mydataindex = Array(0,1, 2, 3,4)
    mydataindex: Array[Int] = Array(0, 1, 2, 3, 4)

    scala> val mydata = sc.parallelize(for {
         |  x <- mydataindex
         |  y <- Array(123,456,789)
         | } yield (x, y), 100)
    mydata: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:27

    scala> val rddMyData = mydata.partitionBy(new HashPartitioner(5))
    rddMyData: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at <console>:26

    scala> val rddMyDataPartitions =   rddMyData.mapPartitionsWithIndex{
         |                 (index, iterator) => {
         |                    val myList = iterator.toList
         |                    myList.map(x => x + " -> " + index).iterator
         |                 }
         |              }
    rddMyDataPartitions: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at mapPartitionsWithIndex at <console>:26

    scala>
         | // this is expected:

    scala> rddMyDataPartitions.take(100)
    res1: Array[String] = Array((0,123) -> 0, (0,456) -> 0, (0,789) -> 0, (1,123) -> 1, (1,456) -> 1, (1,789) -> 1, (2,123) -> 2, (2,456) -> 2, (2,789) -> 2, (3,456) -> 3, (3,789) -> 3, (3,123) -> 3, (4,789) -> 4, (4,123) -> 4, (4,456) -> 4)

    scala> val dfMyData = mydata.toDF()
    dfMyData: org.apache.spark.sql.DataFrame = [_1: int, _2: int]

    scala> val dfMyDataRepartitioned = dfMyData.repartition(5,col("_1"))
    dfMyDataRepartitioned: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_1: int, _2: int]

    scala> dfMyDataRepartitioned.explain(false)
    == Physical Plan ==
    Exchange hashpartitioning(_1#3, 5)
    +- *(1) SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#3, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#4]
       +- Scan ExternalRDDScan[obj#2]

    scala> val dfMyDataRepartitionedPartition  = dfMyDataRepartitioned.withColumn("partition_id", spark_partition_id()).groupBy("partition_id").count()
    dfMyDataRepartitionedPartition: org.apache.spark.sql.DataFrame = [partition_id: int, count: bigint]

    scala> // this is unexpected, because 1 partition has more indexes

    scala> dfMyDataRepartitionedPartition.show()
    +------------+-----+
    |partition_id|count|
    +------------+-----+
    |           1|    6|
    |           3|    3|
    |           4|    3|
    |           2|    3|
    +------------+-----+

I first throught that HashPartitioner is been used in the method of repartitioning a dataframe, but this doesn't seem to be the case, becaue it is working on RDDs.

Could anyone guide me how this "Exchange hashpartitioning" (see explain output above) is working?

2019-01-16 12:20: This is not a duplicate of How does HashPartitioner work? because I am interested in the Hashing Algorithm of repartition by column (+ number partitions) on a Integer column. The general HashPartitioner is working as expected as you can see in the source code.


Solution

  • There is nothing unexpected here. As explained in How does HashPartitioner work? Spark uses hash(key) modulo number of partitions and non-uniform distribution, especially on small datasets is not unexepected.

    Difference between Dataset and RDD is expected as well, as both use different hashing functions (ditto).

    Finally

    The result is that scala/spark is generating an unexpected result and creating less partitions

    is not a correct observation. Number of partitions created is exactly the requested

    scala> dfMyDataRepartitioned.rdd.getNumPartitions
    res8: Int = 5
    

    but the empty ones won't be visible in the aggregation, because there are no corresponding values.