I have a PySpark dataframe like this but with a lot more data:
user_id | event_date |
---|---|
123 | '2024-01-01 14:45:12.00' |
123 | '2024-01-02 14:45:12.00' |
456 | '2024-01-01 14:45:12.00' |
456 | '2024-03-01 14:45:12.00' |
I drop duplicates of users, leaving the last event. I am using something like this:
df = df.orderBy(['user_id', 'event_date'], ascending=False).dropDuplicates(['user_id'])
When I was searching for the solution to some other problem, I found information that this approach may be non-deterministic. Am I doing it wrong? Should I use window functions instead?
When you call dropDuplicates()
without passing any columns to it - it just drops the identical rows, no matter what, for all columns (so we may kind of call it "deterministic" - as all columns will have same values in different rows being dropped and only one is kept - doesn't matter which one).
This non-deterministic behaviour of dropDuplicates()
, when passing a subset of columns, is a known issue and using orderBy()
doesn't enforce determinism either because it's not guaranteed that the ordering will be maintained after applying dropDuplicates()
due to Spark's internal implementations (partitioning, logical and physical plans etc).
It's recommended to use Windows functions instead:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
window_spec = Window.partitionBy("user_id").orderBy(F.col("event_date").desc())
df_with_rank = df.withColumn("row_number", F.row_number().over(window_spec))
df_last_event = df_with_rank.filter(F.col("row_number") == 1).drop("row_number")