My question: Need to understand the time complexity of dynamic forward filling and back filling in spark Hello, I have a scala job that reads Delta Table A, transforms Data Frame and writes to Delta Table B (empty table)
The job runs processes 94,356,726 records and finishes in 16 minutes.
However, after adding dynamic filling logic to the Dataframe transform, the job executes for 2 hours and 20 minutes.
What's a big gap in performance, what's the time complexity of the backfilling?
Details Purpose of the backfilling
I have data like this:
id | version | data_column |
---|---|---|
0 | 4 | null |
0 | 3 | "test 2" |
0 | 2 | null |
0 | 1 | "test 1" |
I want to get the data like this:
id | version | data_column |
---|---|---|
0 | 4 | "test 2"// filled from version 3 |
0 | 3 | "test 2" |
0 | 2 | "test 1" // filled from version 2 |
0 | 1 | "test 1" |
My method Following method here: Pyspark : forward fill with last observation for a DataFrame
// before using backfilling, the job uses the "partitionWindow " for dedup
val partitionWindow = Window
.partitionBy("id")
.orderBy(desc("version"), desc("timestamp'))
val backfillWindow = partitionWindow
.rowsBetween(0, Window.unboundedFollowing)
df.withColumn("data_filled", coalesce(first("data_column", true).over(backfillWindow), col("data_column")))
My Observation
More Thoughts: I'm doing one-direction backfilling, it shall be close to O(Nlog(N)), right? Maybe a bi-directional filling (forward and backward at the same time) would be different
Thank you!
Change the window specification from .rowsBetween(0, Window.unboundedFollowing)
to .rowsBetween(Window.unboundedPreceding, 0)
, and reverse the logic for the bfill/ffill.
From testing, execution worsens when bfill/ffill code is specified as .rowsBetween(0, Window.unboundedFollowing)
vs .rowsBetween(Window.unboundedPreceding, 0)
(and irrespective of doing backfill operations, F.first
, or forwardfill operations, F.last
. The query execution plan shows that the WholeStageCodeGen
takes a lot longer for the former than the latter. I'm not sure why under the hood, but I've found making the change above improves execution by at least an order of magnitude, depending on the number of rows in your dataset.
Testing in PySpark, let's create some synthetic data with some null values.
import itertools as it
import pyspark.sql.functions as F
from pyspark.sql import DataFrame, SparkSession, Window
spark = SparkSession.builder.master("local[*]").getOrCreate()
print(spark.version)
# Create synthetic data
data = list(it.product(range(30_000), ["ham", "eggs"]))
# create null data, every x % 3 == 1 for ham and every x % 3 == 2 for eggs
_data = []
for _ix, t in enumerate(data):
if ((t[1] == "ham") and ((t[0] % 3) == 1)) or ((t[1] == "eggs") and ((t[0] % 3) == 2)):
_data.append((None, t[1], _ix))
else:
_data.append((*t, _ix))
sdf = spark.createDataFrame(_data, ["value", "name", "timestamp"]).select(
"name", "timestamp", "value"
)
sdf.limit(20).show()
Creates the following Spark dataframe:
3.4.1
+----+---------+-----+
|name|timestamp|value|
+----+---------+-----+
| ham| 0| 0|
|eggs| 1| 0|
| ham| 2| null|
|eggs| 3| 1|
| ham| 4| 2|
|eggs| 5| null|
| ham| 6| 3|
|eggs| 7| 3|
| ham| 8| null|
|eggs| 9| 4|
| ham| 10| 5|
|eggs| 11| null|
| ham| 12| 6|
|eggs| 13| 6|
| ham| 14| null|
|eggs| 15| 7|
| ham| 16| 8|
|eggs| 17| null|
| ham| 18| 9|
|eggs| 19| 9|
+----+---------+-----+
We define 2 different backfill operations, the first using .rowsBetween(0, Window.unboundedFollowing)
and the second using .rowsBetween(Window.unboundedPreceding, 0)
.
def bfill_unbounded_following():
window = (
Window.partitionBy(["name"])
.orderBy(F.asc("timestamp"))
.rowsBetween(0, Window.unboundedFollowing)
)
return (F.first("value", ignorenulls=True).over(window)).alias("value_bfill_following")
def bfill_unbounded_preceding():
window = (
Window.partitionBy(["name"])
.orderBy(F.desc("timestamp"))
.rowsBetween(Window.unboundedPreceding, 0)
)
return (F.last("value", ignorenulls=True).over(window)).alias("value_bfill_preceding")
First we check both functions produce the same output. To make it easier to see the imputation, we show the eggs
partition:
sdf_out = sdf.select(
"*",
bfill_unbounded_following(),
bfill_unbounded_preceding(),
(F.col("value_bfill_following") == F.col("value_bfill_preceding")).alias("check_equals"),
).sort("name", "timestamp")
assert sdf_out.where(~F.col("check_equals")).count() == 0
print("test passed")
sdf_out.limit(20).show()
The functions produce the same output, and the test has passed.
test passed
+----+---------+-----+---------------------+---------------------+------------+
|name|timestamp|value|value_bfill_following|value_bfill_preceding|check_equals|
+----+---------+-----+---------------------+---------------------+------------+
|eggs| 1| 0| 0| 0| true|
|eggs| 3| 1| 1| 1| true|
|eggs| 5| null| 3| 3| true|
|eggs| 7| 3| 3| 3| true|
|eggs| 9| 4| 4| 4| true|
|eggs| 11| null| 6| 6| true|
|eggs| 13| 6| 6| 6| true|
|eggs| 15| 7| 7| 7| true|
|eggs| 17| null| 9| 9| true|
|eggs| 19| 9| 9| 9| true|
|eggs| 21| 10| 10| 10| true|
|eggs| 23| null| 12| 12| true|
|eggs| 25| 12| 12| 12| true|
|eggs| 27| 13| 13| 13| true|
|eggs| 29| null| 15| 15| true|
|eggs| 31| 15| 15| 15| true|
|eggs| 33| 16| 16| 16| true|
|eggs| 35| null| 18| 18| true|
|eggs| 37| 18| 18| 18| true|
|eggs| 39| 19| 19| 19| true|
+----+---------+-----+---------------------+---------------------+------------+
Running bfill_unbounded_following()
is 100x slower than bfill_unbounded_preceding()
!
%%timeit -n 1
_ = sdf.select("*", bfill_unbounded_following()).collect()
22.1 s ± 289 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit -n 1
_ = sdf.select("*", bfill_unbounded_preceding()).collect()
191 ms ± 14.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Analysing the execution plan for the bfill_unbounded_following
function, we can see where the bottleneck is: the WholeStageCodeGen (2)
is taking ~12s to run (not shown: the execution plan for bfill_unbounded_preceding
, but it's considerably quicker).
Now, I'm not exactly sure why there is such a large discrepancy. What is worse, is that the execution time when specifying windows as .rowsBetween(0, Window.unboundedFollowing)
grows non-linearly with the number of rows you have. For a 10k dataset it was 25x slower, and for a 30k dataset it was 100x slower (c.f. Testing).