Hi I'm creating a DLT pipeline which uses DLT CDC to implement SCD Type 1 to take the latest record using a datetime column which works with no issues:
@dlt.view
def users():
return spark.readStream.table("source_table")
dlt.create_streaming_table("target_table")
dlt.apply_changes(
target = "target_table",
source = "source_table",
keys = ["Id"],
sequence_by = col("PublishDateTime"),
stored_as_scd_type = 1
)
This gives me the following result:
SOURCE
Id | DateTime |
---|---|
123 | 100424 1717 |
123 | 100424 1710 |
164 | 100424 1704 |
167 | 100424 1619 |
TARGET
Id | DateTime |
---|---|
123 | 100424 1717 |
164 | 100424 1704 |
167 | 100424 1619 |
Essentially taking the latest record using ID and DateTime fields.
My question now is, how do I edit this code to take the LATEST record PER DAY. Please see below the example using the same table:
SOURCE
Id | DateTime | Date |
---|---|---|
123 | 100424 1717 | 100424 |
123 | 100424 1710 | 100424 |
123 | 110424 1717 | 110424 |
164 | 100424 1704 | 100424 |
164 | 110424 1728 | 110424 |
165 | 120424 1447 | 120424 |
165 | 120424 1316 | 120424 |
TARGET
Id | DateTime | Date |
---|---|---|
123 | 100424 1717 | 100424 |
123 | 110424 1717 | 110424 |
164 | 100424 1704 | 100424 |
164 | 110424 1728 | 110424 |
165 | 120424 1447 | 120424 |
As you can see, the target table takes the latest Id using datetime, but for EACH DAY not just the latest period.
I'm aware SCD Type 1 does not capture history so may not be the right option here but SCD Type 2 does but unsure of how to implement this. Would be eternally grateful for any advice here thanks
You apply_changes
on date column also, so that it consider both id
and date
while processing data and gives records for each day and each id with latest updated values.
Code:
import dlt
import pyspark.sql.functions as F
@dlt.view()
def stg_table():
df = spark.readStream.table("my_delta_table")
return df
dlt.create_streaming_table("target_table")
dlt.apply_changes(
target = "target_table",
source = "stg_table",
keys = ["Id","Date"],
sequence_by = F.col("DateTime"),
stored_as_scd_type = 1
)
I given the same sample data you provided.
Output:
Got 5 records in target table.