I am trying to implement SCD Type 2 with Delta Lake implementation. I am trying to use the following code, however, it's not working as expected.
I have AssetId as my primary key (or merge condition). I was able to close or mark a record as inactive when the AssetName changes, however, I am not able to insert the New Record with the updated AssetName.
Has anybody faced this challenge?
Here is the code
output_columns = ['AssetId', 'AssetName', 'AssetCode']
merge_condition = 'AssetId'
merge_builder = target_table.alias("target").merge(
source=source_df.alias("source"),
condition=f"target.{merge_condition} = source.{merge_condition}"
)
# Update matched records: Mark old records as inactive
merge_builder = merge_builder.whenMatchedUpdate(
condition="target.is_current = true AND (" +
" OR ".join([f"target.{col} != source.{col}" for col in output_columns]) +
")",
set={
"is_current": lit(False),
"end_date": current_timestamp()
}
)
#Insert unmatched records: Add new rows for changes or new entries
insert_expr = {col : f"source.{col}" for col in output_columns}
insert_expr.update({
"is_current": lit(True),
"effective_date": current_timestamp(),
"end_date": lit(None)
})
merge_builder = merge_builder.whenNotMatchedInsert(values=insert_expr)
#Execute the merge
merge_builder.execute()
I wrote some function long back. Here I tried to customize it for this question.
It has no is_current
. Instead it uses effective_end_time IS null
for same purpose.
Add test data to your post if you have trouble.
from pyspark.sql import functions as F
from delta import DeltaTable
target_table = DeltaTable.forName('target_delta_table')
output_columns = ['AssetId', 'AssetName', 'AssetCode']
merge_condition = 'AssetId'
def scd_type_2_merge(df_new: DataFrame):
"""
Performs an SCD Type 2 merge from df_new to target_table
"""
df_new = df_new.withColumn('effective_start_time', F.lit(datetime.now()))
attrs_are_different = ' OR '.join([f'NOT updates.{a} <=> target.{a}' for a in output_columns])
# End Of Life rows (new versions of existing rows, so existing rows reach eol)
df_eol = (
df_new.alias('updates')
.join(target_table.toDF().alias('target'), f'{merge_condition}')
.where(f'target.effective_end_time IS null AND ({attrs_are_different})')
.selectExpr('updates.*')
)
# union() resolves columns by order not name. So two tables being union'ed must have
# same column order or the data will get misplaced.
union_col_order = df_eol.selectExpr('NULL as mergeKey', '*').columns
staged_updates = (
# Rows whose effective_end_time be updated in the whenNotMatched clause
df_eol.selectExpr('NULL as mergeKey', '*').select(union_col_order)
# Rows that are new or will update the changed attributes in whenMatchedUpdate clause
.union(df_new.selectExpr(f'{merge_condition} as mergeKey', '*').select(union_col_order))
)
# Newly added rows
insert_mapping = {
f'{merge_condition}': f'updates.{merge_condition}',
'effective_start_time': 'updates.effective_start_time',
'effective_end_time': 'null',
}
insert_mapping.update({attr_col: f'updates.{attr_col}' for attr_col in output_columns})
# SCD merge
target_table.alias('target').merge(
staged_updates.alias('updates'), f'target.{merge_condition} = mergeKey'
).whenMatchedUpdate(
# compare attrs for idempotency
condition=f'target.effective_end_time IS null AND ({attrs_are_different})',
set={'effective_end_time': 'updates.effective_start_time'},
).whenNotMatchedInsert(
values=insert_mapping
).execute()