apache-sparkpysparkapache-spark-sqlpalantir-foundryfoundry-code-repositories

Different sort results after coalesce(1) vs repartition(1)


I have the following script which returns properly sorted result:

from transforms.api import transform, Output
from pyspark.sql import functions as F


@transform(
    out=Output("ri.foundry.main.dataset.29fdbff7-168a-457d-bb79-8f7508cede9d"),
)
def compute(out, ctx):

    data = [("1", "2022-02-01", "older"),
            ("1", "2022-02-12", "older"),
            ("1", "2022-02-09", "new")]
    df_inp = (
        ctx.spark_session.createDataFrame(data, ["c1", "date", "record_status"])
        .withColumn("date", F.to_date("date"))
        .withColumn("record_status", F.lit("older"))
    )
    df_upd = (
        ctx.spark_session.createDataFrame([('1',)], ['c1'])
        .withColumn('date', F.to_date(F.lit('2022-02-17')))
        .withColumn('record_status', F.lit('new'))
    )

    df = df_inp.unionByName(df_upd)

    df = df.coalesce(1)
    df = df.sort(F.desc('date'))
    out.write_dataframe(df)

Notice df = df.coalesce(1) before the sort.

sorted after coalesce

Question. As both df.coalesce(1) and df.repartition(1) should result in one partition, I tried to replace df = df.coalesce(1) with df = df.repartition(1). But then the result appeared not sorted. Why?

not sorted after repartition

Additional details

If I don't interfere with partitioning, the result as well appears not sorted:

not sorted without repartitioning

Physical plan using coalesce(1):

  +- *(3) Sort [date#6 DESC NULLS LAST], true, 0
     +- Coalesce 1
        +- Union
           :- *(1) Project [c1#0, cast(date#1 as date) AS date#6, older AS record_status#10]
           :  +- *(1) Scan ExistingRDD[c1#0,date#1,record_status#2]
           +- *(2) Project [c1#14, 19040 AS date#16, new AS record_status#19]
              +- *(2) Scan ExistingRDD[c1#14]

Physical plan using repartition(1):

  +- *(3) Sort [date#6 DESC NULLS LAST], true, 0
     +- CustomShuffleReader coalesced
        +- ShuffleQueryStage 1
           +- Exchange rangepartitioning(date#6 DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#85]
              +- ShuffleQueryStage 0
                 +- Exchange RoundRobinPartitioning(1), REPARTITION_WITH_NUM, [id=#83]
                    +- Union
                       :- *(1) Project [c1#0, cast(date#1 as date) AS date#6, older AS record_status#10]
                       :  +- *(1) Scan ExistingRDD[c1#0,date#1,record_status#2]
                       +- *(2) Project [c1#14, 19040 AS date#16, new AS record_status#19]
                          +- *(2) Scan ExistingRDD[c1#14]

I am aware of the question Difference between repartition(1) and coalesce(1) where the guy says he cannot use coalesce(1) for some reason. In my case it's the opposite.


Solution

  • The reason why the result of repartitioning isn't sorted is visible in the query plans you've listed - it writes out multiple partitions rather than one. There are two Exchanges, the first (lower) brings the data to a single partition, but the second (higher on the plan) does a RangePartitioning to up to 200(*) partitions, on which the Sort happens. Each resulting partition/file is most likely sorted but the order across files isn't maintained.

    This seems to be a bug in Spark 3.0.2, on which Foundry's Spark is based on currently. Testing the different Spark versions, I see this happening on 3.0.2:

          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 3.0.2
          /_/
    
    >>> df = spark.createDataFrame([{"a": 3*i} for i in range(10000)])
    >>> sorted = df.repartition(1).sort("a")
    >>> sorted.explain()
    == Physical Plan ==
    *(2) Sort [a#0L ASC NULLS FIRST], true, 0
    +- Exchange rangepartitioning(a#0L ASC NULLS FIRST, 200), true, [id=#15]
       +- Exchange RoundRobinPartitioning(1), false, [id=#14]
          +- *(1) Scan ExistingRDD[a#0L]
    

    but not on 3.2.0 (with AQE disabled just to match 3.0.2, it doesn't affect the result):

          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 3.2.0
          /_/
    
    >>> df = spark.createDataFrame([{"a": 3*i} for i in range(10000)])
    >>> sorted = df.repartition(1).sort("a")
    >>> sorted.explain()
    == Physical Plan ==
    *(2) Sort [a#0L ASC NULLS FIRST], true, 0
    +- Exchange SinglePartition, REPARTITION_BY_NUM, [id=#12]
       +- *(1) Scan ExistingRDD[a#0L]
    

    Note how 3.2.0 shows the initial Exchange as SinglePartition rather than RoundRobinPartitioning(1), and based on that is able to skip the rangepartitioning otherwise needed for Sort.