How do I specify the REBALANCE
partitioning hint with column names using the pyspark APIs?
As an example, let's assume we have
df = spark.range(10)
The following attempt fails:
>>> df.hint("rebalance", "id").explain()
...
pyspark.sql.utils.AnalysisException: REBALANCE Hint parameter should include columns, but id found
How do I specify the columns if not by name, i.e. a simple string?
>>> df.alias("df").hint("rebalance", "df.id").explain()
...
pyspark.sql.utils.AnalysisException: REBALANCE Hint parameter should include columns, but df.id found
>>> import pyspark.sql.functions as F
>>> df.hint("rebalance", F.col("id")).explain()
TypeError: all parameters should be in (<class 'str'>, <class 'list'>, <class 'float'>, <class 'int'>), got Column<'id'> of type <class 'pyspark.sql.column.Column'>
Note that specifying the rebalance hint without columns works just fine but is not what I'm looking for:
>>> df.hint("rebalance").explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange RoundRobinPartitioning(4), REBALANCE_PARTITIONS_BY_NONE, [id=#551]
+- Range (0, 10, step=1, splits=4)
I've done some research and this appears to be a bug in Spark (more or less).
The column names are never converted into org.apache.spark.sql.catalyst.expressions.Expression
as they are in df.repartition
.
Even in Scala, passing a string causes the same exception:
scala> df.hint("rebalance", "id")
org.apache.spark.sql.AnalysisException: REBALANCE Hint parameter should include columns, but id found
So does passing the column (surprisingly):
scala> df.hint("rebalance", $"id")
org.apache.spark.sql.AnalysisException: REBALANCE Hint parameter should include columns, but id found
But getting the column's expression works:
scala> df.hint("rebalance", $"id".expr)
res10: org.apache.spark.sql.Dataset[Long] = [id: bigint]
I'm going to raise an issue on the main Spark project about this and will update this answer with the issue.