After applying sortWithinPartitions to a df and writing the output to a table I'm getting a result I'm not sure how to interpret.
df
.select($"type", $"id", $"time")
.sortWithinPartitions($"type", $"id", $"time")
result file looks somewhat like
1 a 5
2 b 1
1 a 6
2 b 2
1 a 7
2 b 3
1 a 8
2 b 4
It's not actually random, but neither is it sorted like I would expect it to be. Namely, first by type, then id, then time. If I try to use a repartition before sorting, then I get the result I want. But for some reason the files weight 5 times more(100gb vs 20gb).
I'm writing to a hive orc table with compresssion set to snappy.
Does anyone know why it's sorted like this and why a repartition gets the right order, but a larger size?
Using spark 2.2.
The documentation of sortWithinPartitions states
Returns a new Dataset with each partition sorted by the given expressions
The easiest way to think of this function is to imagine a fourth column (the partition id) that is used as primary sorting criterion. The function spark_partition_id() prints the partition.
For example if you have just one large partition (something that you as a Spark user would never do!), sortWithinPartitions
works as a normal sort:
df.repartition(1)
.sortWithinPartitions("type","id","time")
.withColumn("partition", spark_partition_id())
.show();
prints
+----+---+----+---------+
|type| id|time|partition|
+----+---+----+---------+
| 1| a| 5| 0|
| 1| a| 6| 0|
| 1| a| 7| 0|
| 1| a| 8| 0|
| 2| b| 1| 0|
| 2| b| 2| 0|
| 2| b| 3| 0|
| 2| b| 4| 0|
+----+---+----+---------+
If there are more partitions, the results are only sorted within each partition:
df.repartition(4)
.sortWithinPartitions("type","id","time")
.withColumn("partition", spark_partition_id())
.show();
prints
+----+---+----+---------+
|type| id|time|partition|
+----+---+----+---------+
| 2| b| 1| 0|
| 2| b| 3| 0|
| 1| a| 5| 1|
| 1| a| 6| 1|
| 1| a| 8| 2|
| 2| b| 2| 2|
| 1| a| 7| 3|
| 2| b| 4| 3|
+----+---+----+---------+
Why would one use sortWithinPartitions
instead of sort? sortWithinPartitions
does not trigger a shuffle, as the data is only moved within the executors. sort
however will trigger a shuffle. Therefore sortWithinPartitions
executes faster. If the data is partitioned by a meaningful column, sorting within each partition might be enough.