databricksazure-databricksdelta-live-tablesscd

Databricks DLT CDC/SCD - Taking the latest ID per day


Hi I'm creating a DLT pipeline which uses DLT CDC to implement SCD Type 1 to take the latest record using a datetime column which works with no issues:

@dlt.view
def users():
  return spark.readStream.table("source_table")

dlt.create_streaming_table("target_table")

dlt.apply_changes(
  target = "target_table",
  source = "source_table",
  keys = ["Id"],
  sequence_by = col("PublishDateTime"),
  stored_as_scd_type = 1
)

This gives me the following result:

SOURCE

Id DateTime
123 100424 1717
123 100424 1710
164 100424 1704
167 100424 1619

TARGET

Id DateTime
123 100424 1717
164 100424 1704
167 100424 1619

Essentially taking the latest record using ID and DateTime fields.

My question now is, how do I edit this code to take the LATEST record PER DAY. Please see below the example using the same table:

SOURCE

Id DateTime Date
123 100424 1717 100424
123 100424 1710 100424
123 110424 1717 110424
164 100424 1704 100424
164 110424 1728 110424
165 120424 1447 120424
165 120424 1316 120424

TARGET

Id DateTime Date
123 100424 1717 100424
123 110424 1717 110424
164 100424 1704 100424
164 110424 1728 110424
165 120424 1447 120424

As you can see, the target table takes the latest Id using datetime, but for EACH DAY not just the latest period.

I'm aware SCD Type 1 does not capture history so may not be the right option here but SCD Type 2 does but unsure of how to implement this. Would be eternally grateful for any advice here thanks


Solution

  • You apply_changes on date column also, so that it consider both id and date while processing data and gives records for each day and each id with latest updated values.

    Code:

    import dlt
    import pyspark.sql.functions as F
    
    @dlt.view()
    def stg_table():
        df = spark.readStream.table("my_delta_table")
        return df
    
    dlt.create_streaming_table("target_table")
    
    dlt.apply_changes(
      target = "target_table",
      source = "stg_table",
      keys = ["Id","Date"],
      sequence_by = F.col("DateTime"),
      stored_as_scd_type = 1
    )
    

    I given the same sample data you provided.

    Output:

    enter image description here

    Got 5 records in target table.

    enter image description here