I have a data connection source that creates two datasets:
The two datasets pull from the same source. Dataset X
consists of the current state of all rows in the source table. Dataset Y
pulls all rows that have been updated since the last build. These two datasets are then merged downstream into dataset Z
with dataset Z
being either dataset X
or the most recent version of each row from dataset Y
. This allows us to both have low latency updates and maintain good partitioning.
When rows are deleted in the source table, the rows are no longer present in dataset X
but are still present in dataset Y
.
What would be the best way keep these 'deleted' rows in dataset Z
? Ideally I would also be able to snapshot dataset Y
without losing any of the 'deleted' rows.
Good question! From what I understand, you want dataset Z
to have only the most up-to-date rows, including the most up-to-date deleted rows. Both updated rows and deleted rows are present in Y
. In this case, I would suggest first unioning Y
and X
together, so that all rows, including deleted rows are present in the union dataset. Then, use a window function over a date column in order to get the most recent version of each row. Here is an outline of the pyspark code I would suggest for this:
from pyspark.sql import Window
import pyspark.sql.functions as F
window = Window.partitionBy(primary_keys).orderBy(F.col(date_column).desc())
Z = X.unionByName(Y) # union to get all columns, including deleted
Z = Z.withColumn("row_num", F.row_number().over(window)) # rank by date created/updated
Z = Z.filter(F.col("row_num") == 1).drop("row_num") # keep only the latest version of each row
Note that this solution does not get around the issue of what happens if Y snapshots.