apache-sparkpysparkpalantir-foundry

How do you avoid sorting when writing partitioned data in Spark on Palantir Foundry?


Spark is adding a sort operation on the partition keys into query plans.

This slows down builds, increasing memory usage and causing disk spill.

I am sure this is a new behaviour, though I can't verify since spark details for old builds are no longer available.

The write logic:

df = df.repartition('date')

tgt.write_dataframe(df, partition_cols=['date'])

The logical plan looks like:

InsertIntoHadoopFsRelationCommand foundry://...
+- WriteFiles
   +- Sort [date#337 ASC NULLS FIRST], false
      +- Project ...
         +- CollectMetrics ...
            +- RepartitionByExpression [date#337]
               +- Project ...

These issues look like they may be related, but they should be fixed in Spark 3.4+:

I believe until recently our Foundry instance used Spark 3.2.1, but the runtime version has silently been changed to Spark 3.4 and 3.5 regardless of the target Spark version.

Is this sort somehow necessary? Is it possible to remove it?


Solution

  • According to SPARK-44512, partitionBy does not guarantee sorting.

    It can sort your data (even after you sorted it explicitly):

    Although I understand Apache Spark 3.4.0 changes the behavior like the above, I don't think there is a contract that Apache Spark's partitionBy operation preserves the previous ordering .

    So, let me close this issue as Not A Problem.

    AFAIK this behaviour is not documented.

    To remove the sort from the execution plan, you can set this undocumented configuration value: spark.sql.optimizer.plannedWrite.enabled=false.

    The recommended way to manage configuration in Foundry is setting up Spark profile. You can also set it directly from a transform using the injected ctx parameter:

    @transform(
        tgt=Output(...),
    )
    def compute(ctx, tgt, ...):
        df = ...
    
        # Prevent sort with partitioned write
        ctx.spark_session.conf.set('spark.sql.optimizer.plannedWrite.enabled', False)
    
        tgt.write_dataframe(df, partition_cols=['col1']))