pysparkazure-synapse

SCD Type 2 Issue


I am trying to implement SCD Type 2 with Delta Lake implementation. I am trying to use the following code, however, it's not working as expected.

I have AssetId as my primary key (or merge condition). I was able to close or mark a record as inactive when the AssetName changes, however, I am not able to insert the New Record with the updated AssetName.

Has anybody faced this challenge?

Here is the code

output_columns = ['AssetId', 'AssetName', 'AssetCode']
merge_condition = 'AssetId'

merge_builder = target_table.alias("target").merge(
    source=source_df.alias("source"),
    condition=f"target.{merge_condition} = source.{merge_condition}"
)


# Update matched records: Mark old records as inactive
merge_builder = merge_builder.whenMatchedUpdate(
    condition="target.is_current = true AND (" +
              " OR ".join([f"target.{col} != source.{col}" for col in output_columns]) +
              ")",
   set={
        "is_current": lit(False),
        "end_date": current_timestamp()
    }
)

#Insert unmatched records: Add new rows for changes or new entries
insert_expr = {col : f"source.{col}" for col in output_columns}

insert_expr.update({
   "is_current": lit(True),
    "effective_date": current_timestamp(),
    "end_date": lit(None)
})


merge_builder = merge_builder.whenNotMatchedInsert(values=insert_expr)

#Execute the merge
merge_builder.execute()

Solution

  • I wrote some function long back. Here I tried to customize it for this question.

    It has no is_current. Instead it uses effective_end_time IS null for same purpose.

    Add test data to your post if you have trouble.

    from pyspark.sql import functions as F
    from delta import DeltaTable
    
    target_table = DeltaTable.forName('target_delta_table')
    output_columns = ['AssetId', 'AssetName', 'AssetCode']
    merge_condition = 'AssetId'
    
    
    def scd_type_2_merge(df_new: DataFrame):
        """
        Performs an SCD Type 2 merge from df_new to target_table
        """
    
        df_new = df_new.withColumn('effective_start_time', F.lit(datetime.now()))
        attrs_are_different = ' OR '.join([f'NOT updates.{a} <=> target.{a}' for a in output_columns])
    
        # End Of Life rows (new versions of existing rows, so existing rows reach eol)
        df_eol = (
            df_new.alias('updates')
            .join(target_table.toDF().alias('target'), f'{merge_condition}')
            .where(f'target.effective_end_time IS null AND ({attrs_are_different})')
            .selectExpr('updates.*')
        )
    
        # union() resolves columns by order not name. So two tables being union'ed must have
        # same column order or the data will get misplaced.
        union_col_order = df_eol.selectExpr('NULL as mergeKey', '*').columns
        staged_updates = (
            # Rows whose effective_end_time be updated in the whenNotMatched clause
            df_eol.selectExpr('NULL as mergeKey', '*').select(union_col_order)
            # Rows that are new or will update the changed attributes in whenMatchedUpdate clause
            .union(df_new.selectExpr(f'{merge_condition} as mergeKey', '*').select(union_col_order))
        )
    
        # Newly added rows
        insert_mapping = {
            f'{merge_condition}': f'updates.{merge_condition}',
            'effective_start_time': 'updates.effective_start_time',
            'effective_end_time': 'null',
        }
        insert_mapping.update({attr_col: f'updates.{attr_col}' for attr_col in output_columns})
    
        # SCD merge
        target_table.alias('target').merge(
            staged_updates.alias('updates'), f'target.{merge_condition} = mergeKey'
        ).whenMatchedUpdate(
            # compare attrs for idempotency
            condition=f'target.effective_end_time IS null AND ({attrs_are_different})',
            set={'effective_end_time': 'updates.effective_start_time'},
        ).whenNotMatchedInsert(
            values=insert_mapping
        ).execute()