apache-sparkpysparkapache-spark-sqlpartitioning

pyspark: how to specify rebalance partitioning hint with columns


How do I specify the REBALANCE partitioning hint with column names using the pyspark APIs?

As an example, let's assume we have

df = spark.range(10)

The following attempt fails:

>>> df.hint("rebalance", "id").explain()
...
pyspark.sql.utils.AnalysisException: REBALANCE Hint parameter should include columns, but id found

How do I specify the columns if not by name, i.e. a simple string?

Note that specifying the rebalance hint without columns works just fine but is not what I'm looking for:

>>> df.hint("rebalance").explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange RoundRobinPartitioning(4), REBALANCE_PARTITIONS_BY_NONE, [id=#551]
   +- Range (0, 10, step=1, splits=4)

Solution

  • I've done some research and this appears to be a bug in Spark (more or less). The column names are never converted into org.apache.spark.sql.catalyst.expressions.Expression as they are in df.repartition.

    Even in Scala, passing a string causes the same exception:

    scala> df.hint("rebalance", "id")
    
    org.apache.spark.sql.AnalysisException: REBALANCE Hint parameter should include columns, but id found
    

    So does passing the column (surprisingly):

    scala> df.hint("rebalance", $"id")
    
    org.apache.spark.sql.AnalysisException: REBALANCE Hint parameter should include columns, but id found
    

    But getting the column's expression works:

    scala> df.hint("rebalance", $"id".expr)
    res10: org.apache.spark.sql.Dataset[Long] = [id: bigint]
    

    I'm going to raise an issue on the main Spark project about this and will update this answer with the issue.