apache-sparkpysparkpalantir-foundryincremental-buildfoundry-code-repositories

orderBy and sort is not applied on the full dataframe


The final result is sorted on column 'timestamp'. I have two scripts which only differ in one value provided to the column 'record_status' ('old' vs. 'older'). As data is sorted on column 'timestamp', the resulting order should be identic. However, the order is different. It looks like, in the first case, the sort is performed before the union, while it's placed after it.

Using orderBy instead of sort doesn't make any difference.

Why is it happening and how to prevent it? (I use Spark 3.0.2)

Script1 (full) - result after 4 runs (builds):

enter image description here

from transforms.api import transform, Output, incremental
from pyspark.sql import functions as F, types as T


@incremental(
    require_incremental=True,
)
@transform(
    out=Output("ri.foundry.main.dataset.a82be5aa-81f7-45cf-8c59-05912c8ed6c7"),
)
def compute(out, ctx):

    out_schema = T.StructType([
        T.StructField('c1', T.StringType()),
        T.StructField('timestamp', T.TimestampType()),
        T.StructField('record_status', T.StringType()),
    ])
    df_out = (
        out.dataframe('previous', out_schema)
        .withColumn('record_status', F.lit('older'))
    )

    df_upd = (
        ctx.spark_session.createDataFrame([('1',)], ['c1'])
        .withColumn('timestamp', F.current_timestamp())
        .withColumn('record_status', F.lit('new'))
    )

    df = df_out.unionByName(df_upd)
    df = df.sort('timestamp', ascending=False)

    out.set_mode('replace')
    out.write_dataframe(df)

Script2 (full) - result after 4 runs (builds):

enter image description here

from transforms.api import transform, Output, incremental
from pyspark.sql import functions as F, types as T


@incremental(
    require_incremental=True,
)
@transform(
    out=Output("ri.foundry.main.dataset.caee8f7a-64b0-4837-b4f3-d5a6d5dedd85"),
)
def compute(out, ctx):

    out_schema = T.StructType([
        T.StructField('c1', T.StringType()),
        T.StructField('timestamp', T.TimestampType()),
        T.StructField('record_status', T.StringType()),
    ])
    df_out = (
        out.dataframe('previous', out_schema)
        .withColumn('record_status', F.lit('old'))
    )

    df_upd = (
        ctx.spark_session.createDataFrame([('1',)], ['c1'])
        .withColumn('timestamp', F.current_timestamp())
        .withColumn('record_status', F.lit('new'))
    )

    df = df_out.unionByName(df_upd)
    df = df.sort('timestamp', ascending=False)

    out.set_mode('replace')
    out.write_dataframe(df)

Query plans in both transformations show that sorting must be performed after the union (inspecting logical and physical plans I see no differences except for IDs and RIDs, but all the transformation steps are in the same place):

enter image description here

Observation:
Using the following profile the sort works well (query plans don't change):

@configure(["KUBERNETES_NO_EXECUTORS_SMALL"])

Solution

  • As it turns out, this behavior is not caused by @incremental. It can be observed in a regular transformation too:

    from transforms.api import transform, Output
    from pyspark.sql import functions as F
    
    
    @transform(
        out=Output("ri.foundry.main.dataset.beea7dd2-8da3-4abf-9103-464ec646dc00"),
    )
    def compute(out, ctx):
    
        data = [("1", "2022-02-16T17:48:15.653Z", "older"),
                ("1", "2022-02-16T17:46:58.054Z", "older"),
                ("1", "2022-02-16T17:50:50.850Z", "new")]
        df_inp = (
            ctx.spark_session.createDataFrame(data, ["c1", "timestamp", "record_status"])
            .withColumn("timestamp", F.to_timestamp("timestamp"))
            .withColumn("record_status", F.lit("older"))
        )
        df_upd = (
            ctx.spark_session.createDataFrame([('1',)], ['c1'])
            .withColumn('timestamp', F.current_timestamp())
            .withColumn('record_status', F.lit('new'))
        )
    
        df = df_inp.unionByName(df_upd)
        df = df.sort(F.desc('timestamp'))
    
        out.write_dataframe(df)
    

    Incorrect sort results using transform decorator without input datasets

    When asking the question, I provided 2 scripts: one containing supposedly working sort, the other - failing sort. The reality is that both scripts don't work, just the "correct" one needed more runs to start showing incorrect sort order:

    Foundry bad sort or groupby order

    The reason lies in the partitioning of input dfs. Apparently, sort and groupBy performs the sort only in partitions (there are several of them). For some reason data is not moved to one executor or driver. Therefore, the resulting combined dataset does not have a unified sort order. This is why the usage of profile "KUBERNETES_NO_EXECUTORS_SMALL" yielded correct sort order (all operations were performed in one node - the driver).

    The only solution I could find was using df.coalesce just before the df.sort() line:

    df = df_out.unionByName(df_upd)
    df = df.coalesce(1)
    df = df.sort(F.desc('timestamp'))
    

    Since Spark 3.2.0, df = df.repartition(1) will also work.