scalaperformanceapache-sparkpysparkdata-processing

What's the time complexity of forward filling and backward filling in spark?


My question: Need to understand the time complexity of dynamic forward filling and back filling in spark Hello, I have a scala job that reads Delta Table A, transforms Data Frame and writes to Delta Table B (empty table)

The job runs processes 94,356,726 records and finishes in 16 minutes.

However, after adding dynamic filling logic to the Dataframe transform, the job executes for 2 hours and 20 minutes.

What's a big gap in performance, what's the time complexity of the backfilling?

Details Purpose of the backfilling

I have data like this:

id version data_column
0 4 null
0 3 "test 2"
0 2 null
0 1 "test 1"

I want to get the data like this:

id version data_column
0 4 "test 2"// filled from version 3
0 3 "test 2"
0 2 "test 1" // filled from version 2
0 1 "test 1"

My method Following method here: Pyspark : forward fill with last observation for a DataFrame

// before using backfilling, the job uses the "partitionWindow " for dedup
val partitionWindow = Window
  .partitionBy("id")
  .orderBy(desc("version"), desc("timestamp'))

val backfillWindow = partitionWindow
  .rowsBetween(0, Window.unboundedFollowing)

df.withColumn("data_filled", coalesce(first("data_column", true).over(backfillWindow), col("data_column")))

My Observation

More Thoughts: I'm doing one-direction backfilling, it shall be close to O(Nlog(N)), right? Maybe a bi-directional filling (forward and backward at the same time) would be different

Thank you!


Solution

  • tl; dr

    Change the window specification from .rowsBetween(0, Window.unboundedFollowing) to .rowsBetween(Window.unboundedPreceding, 0), and reverse the logic for the bfill/ffill.

    Longer read

    From testing, execution worsens when bfill/ffill code is specified as .rowsBetween(0, Window.unboundedFollowing) vs .rowsBetween(Window.unboundedPreceding, 0) (and irrespective of doing backfill operations, F.first, or forwardfill operations, F.last. The query execution plan shows that the WholeStageCodeGen takes a lot longer for the former than the latter. I'm not sure why under the hood, but I've found making the change above improves execution by at least an order of magnitude, depending on the number of rows in your dataset.

    Testing

    Setup

    Testing in PySpark, let's create some synthetic data with some null values.

    import itertools as it
    
    import pyspark.sql.functions as F
    from pyspark.sql import DataFrame, SparkSession, Window
    
    spark = SparkSession.builder.master("local[*]").getOrCreate()
    print(spark.version)
    
    # Create synthetic data
    data = list(it.product(range(30_000), ["ham", "eggs"]))
    # create null data, every x % 3 == 1 for ham and every x % 3 == 2 for eggs
    _data = []
    for _ix, t in enumerate(data):
        if ((t[1] == "ham") and ((t[0] % 3) == 1)) or ((t[1] == "eggs") and ((t[0] % 3) == 2)):
            _data.append((None, t[1], _ix))
        else:
            _data.append((*t, _ix))
    
    sdf = spark.createDataFrame(_data, ["value", "name", "timestamp"]).select(
        "name", "timestamp", "value"
    )
    
    sdf.limit(20).show()
    

    Creates the following Spark dataframe:

    3.4.1
    
    +----+---------+-----+
    |name|timestamp|value|
    +----+---------+-----+
    | ham|        0|    0|
    |eggs|        1|    0|
    | ham|        2| null|
    |eggs|        3|    1|
    | ham|        4|    2|
    |eggs|        5| null|
    | ham|        6|    3|
    |eggs|        7|    3|
    | ham|        8| null|
    |eggs|        9|    4|
    | ham|       10|    5|
    |eggs|       11| null|
    | ham|       12|    6|
    |eggs|       13|    6|
    | ham|       14| null|
    |eggs|       15|    7|
    | ham|       16|    8|
    |eggs|       17| null|
    | ham|       18|    9|
    |eggs|       19|    9|
    +----+---------+-----+
    

    Definitions

    We define 2 different backfill operations, the first using .rowsBetween(0, Window.unboundedFollowing) and the second using .rowsBetween(Window.unboundedPreceding, 0).

    def bfill_unbounded_following():
        window = (
            Window.partitionBy(["name"])
            .orderBy(F.asc("timestamp"))
            .rowsBetween(0, Window.unboundedFollowing)
        )
        return (F.first("value", ignorenulls=True).over(window)).alias("value_bfill_following")
    
    
    def bfill_unbounded_preceding():
        window = (
            Window.partitionBy(["name"])
            .orderBy(F.desc("timestamp"))
            .rowsBetween(Window.unboundedPreceding, 0)
        )
        return (F.last("value", ignorenulls=True).over(window)).alias("value_bfill_preceding")
    

    Results

    Outputs

    First we check both functions produce the same output. To make it easier to see the imputation, we show the eggs partition:

    sdf_out = sdf.select(
        "*",
        bfill_unbounded_following(),
        bfill_unbounded_preceding(),
        (F.col("value_bfill_following") == F.col("value_bfill_preceding")).alias("check_equals"),
    ).sort("name", "timestamp")
    
    assert sdf_out.where(~F.col("check_equals")).count() == 0
    print("test passed")
    
    sdf_out.limit(20).show()
    

    The functions produce the same output, and the test has passed.

    test passed
    +----+---------+-----+---------------------+---------------------+------------+
    |name|timestamp|value|value_bfill_following|value_bfill_preceding|check_equals|
    +----+---------+-----+---------------------+---------------------+------------+
    |eggs|        1|    0|                    0|                    0|        true|
    |eggs|        3|    1|                    1|                    1|        true|
    |eggs|        5| null|                    3|                    3|        true|
    |eggs|        7|    3|                    3|                    3|        true|
    |eggs|        9|    4|                    4|                    4|        true|
    |eggs|       11| null|                    6|                    6|        true|
    |eggs|       13|    6|                    6|                    6|        true|
    |eggs|       15|    7|                    7|                    7|        true|
    |eggs|       17| null|                    9|                    9|        true|
    |eggs|       19|    9|                    9|                    9|        true|
    |eggs|       21|   10|                   10|                   10|        true|
    |eggs|       23| null|                   12|                   12|        true|
    |eggs|       25|   12|                   12|                   12|        true|
    |eggs|       27|   13|                   13|                   13|        true|
    |eggs|       29| null|                   15|                   15|        true|
    |eggs|       31|   15|                   15|                   15|        true|
    |eggs|       33|   16|                   16|                   16|        true|
    |eggs|       35| null|                   18|                   18|        true|
    |eggs|       37|   18|                   18|                   18|        true|
    |eggs|       39|   19|                   19|                   19|        true|
    +----+---------+-----+---------------------+---------------------+------------+
    
    Execution time

    Running bfill_unbounded_following() is 100x slower than bfill_unbounded_preceding()!

    %%timeit -n 1
    
    _ = sdf.select("*", bfill_unbounded_following()).collect()
    
    22.1 s ± 289 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    
    %%timeit -n 1
    
    _ = sdf.select("*", bfill_unbounded_preceding()).collect()
    
    191 ms ± 14.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    
    Execution plan

    Analysing the execution plan for the bfill_unbounded_following function, we can see where the bottleneck is: the WholeStageCodeGen (2) is taking ~12s to run (not shown: the execution plan for bfill_unbounded_preceding, but it's considerably quicker).

    bfill_unbounded_following

    Now, I'm not exactly sure why there is such a large discrepancy. What is worse, is that the execution time when specifying windows as .rowsBetween(0, Window.unboundedFollowing) grows non-linearly with the number of rows you have. For a 10k dataset it was 25x slower, and for a 30k dataset it was 100x slower (c.f. Testing).