I have a large (TBs) dataset consisting of multiple tables. The end user needs to be able to join them efficiently. The unique key for each record is continuous, and it is important that records with key values close to one another are partitioned together.
The obvious solution is to use partitionByRange
on both dataframes prior to joining, but Spark 3.5.2 insists on shuffling by hash when joining tables, even though this should not be necessary.
The alternative strategy is to bucket and sort on this key - this avoids shuffling when joining, and is 4x faster on a small subset (and that difference will increase with data volume).
The key difference in .explain()
is this:
+- Exchange hashpartitioning(…), ENSURE_REQUIREMENTS, [id=#388]
This line is not present if I bucket and sort instead of using partitionByRange
.
For various reasons specific to my use case, I don't want to use the bucketing workaround- and I'd like to find out why Spark is behaving like this anyway.
Why does Spark insist on doing the full shuffle after partitionByRange? Isn't this a pretty major flaw/bug?
Turns out that is that this is essentially a flaw of Spark, and the correct solution is to use Iceberg to partition data to use storage-partitioned joins: https://medium.com/expedia-group-tech/turbocharge-efficiency-slash-costs-mastering-spark-iceberg-joins-with-storage-partitioned-join-03fdc1ff75c0.