I have the following script which returns properly sorted result:
from transforms.api import transform, Output
from pyspark.sql import functions as F
@transform(
out=Output("ri.foundry.main.dataset.29fdbff7-168a-457d-bb79-8f7508cede9d"),
)
def compute(out, ctx):
data = [("1", "2022-02-01", "older"),
("1", "2022-02-12", "older"),
("1", "2022-02-09", "new")]
df_inp = (
ctx.spark_session.createDataFrame(data, ["c1", "date", "record_status"])
.withColumn("date", F.to_date("date"))
.withColumn("record_status", F.lit("older"))
)
df_upd = (
ctx.spark_session.createDataFrame([('1',)], ['c1'])
.withColumn('date', F.to_date(F.lit('2022-02-17')))
.withColumn('record_status', F.lit('new'))
)
df = df_inp.unionByName(df_upd)
df = df.coalesce(1)
df = df.sort(F.desc('date'))
out.write_dataframe(df)
Notice df = df.coalesce(1)
before the sort
.
Question. As both df.coalesce(1)
and df.repartition(1)
should result in one partition, I tried to replace df = df.coalesce(1)
with df = df.repartition(1)
. But then the result appeared not sorted. Why?
Additional details
If I don't interfere with partitioning, the result as well appears not sorted:
Physical plan using coalesce(1)
:
+- *(3) Sort [date#6 DESC NULLS LAST], true, 0
+- Coalesce 1
+- Union
:- *(1) Project [c1#0, cast(date#1 as date) AS date#6, older AS record_status#10]
: +- *(1) Scan ExistingRDD[c1#0,date#1,record_status#2]
+- *(2) Project [c1#14, 19040 AS date#16, new AS record_status#19]
+- *(2) Scan ExistingRDD[c1#14]
Physical plan using repartition(1)
:
+- *(3) Sort [date#6 DESC NULLS LAST], true, 0
+- CustomShuffleReader coalesced
+- ShuffleQueryStage 1
+- Exchange rangepartitioning(date#6 DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#85]
+- ShuffleQueryStage 0
+- Exchange RoundRobinPartitioning(1), REPARTITION_WITH_NUM, [id=#83]
+- Union
:- *(1) Project [c1#0, cast(date#1 as date) AS date#6, older AS record_status#10]
: +- *(1) Scan ExistingRDD[c1#0,date#1,record_status#2]
+- *(2) Project [c1#14, 19040 AS date#16, new AS record_status#19]
+- *(2) Scan ExistingRDD[c1#14]
I am aware of the question Difference between repartition(1) and coalesce(1) where the guy says he cannot use coalesce(1)
for some reason. In my case it's the opposite.
The reason why the result of repartitioning isn't sorted is visible in the query plans you've listed - it writes out multiple partitions rather than one. There are two Exchanges, the first (lower) brings the data to a single partition, but the second (higher on the plan) does a RangePartitioning to up to 200(*) partitions, on which the Sort happens. Each resulting partition/file is most likely sorted but the order across files isn't maintained.
This seems to be a bug in Spark 3.0.2, on which Foundry's Spark is based on currently. Testing the different Spark versions, I see this happening on 3.0.2:
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.0.2
/_/
>>> df = spark.createDataFrame([{"a": 3*i} for i in range(10000)])
>>> sorted = df.repartition(1).sort("a")
>>> sorted.explain()
== Physical Plan ==
*(2) Sort [a#0L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(a#0L ASC NULLS FIRST, 200), true, [id=#15]
+- Exchange RoundRobinPartitioning(1), false, [id=#14]
+- *(1) Scan ExistingRDD[a#0L]
but not on 3.2.0 (with AQE disabled just to match 3.0.2, it doesn't affect the result):
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.2.0
/_/
>>> df = spark.createDataFrame([{"a": 3*i} for i in range(10000)])
>>> sorted = df.repartition(1).sort("a")
>>> sorted.explain()
== Physical Plan ==
*(2) Sort [a#0L ASC NULLS FIRST], true, 0
+- Exchange SinglePartition, REPARTITION_BY_NUM, [id=#12]
+- *(1) Scan ExistingRDD[a#0L]
Note how 3.2.0 shows the initial Exchange as SinglePartition rather than RoundRobinPartitioning(1), and based on that is able to skip the rangepartitioning otherwise needed for Sort.