azure-data-factoryparameter-passingazure-data-lake-gen2change-data-capture

How to Configure CDC in Azure Data Flow with Parameterized Datasets for Daily Partition Processing?


I am fairly new to Azure Data Factory (ADF) but have been learning and experimenting with some of its advanced features. I'm currently working on a use case involving Change Data Capture (CDC) and partitioned folders in Azure Data Lake Storage (ADLS). I need some guidance on how to configure this properly.

My Setup

I am creating a Data Flow in ADF that processes daily partitioned files in ADLS. Here’s how it works:

  1. Source Data:

    • Located in a "raw" layer, with folders partitioned by date, e.g., {file_system}/raw/supplier/20241116.

    • The file within the partition is named data.parquet.

  2. Transformations:

    • The Data Flow applies transformations to this data.
  3. Destination Data:

    • The transformed data is written to a "curated" layer with a similar partitioning structure, e.g., {file_system}/curated/supplier/20241116.

Dynamic Configuration

I’ve parameterized the datasets used in the Data Flow as follows:

The values for p_partition_folder are derived from a parent pipeline, which includes:

  1. Lookup Activity:

    • Reads a JSON file that contains:

      { "current_partition_date": "20241116", "next_partition_date": "20241117" }

    • current_partition_date refers to the latest partition in the curated layer (the baseline for CDC).

    • next_partition_date refers to the partition to which the current run’s transformed data should be written.

  2. Set Variable Activities:

    • Extract the current_partition_date and next_partition_date values from the Lookup activity output.
  3. Data Flow Activity:

    • Passes these parameters (current_partition_date and next_partition_date) into the Data Flow for dynamic path resolution.

The Problem

In the Data Flow:

I want the Data Flow to:

  1. For the current run:

    • Compare the latest raw data (e.g., {file_system}/raw/supplier/20241117/data.parquet) with the baseline data from the curated layer (e.g., {file_system}/curated/supplier/20241116/data_today.parquet).
  2. Identify changes (inserts, updates, deletes) between these files using CDC.

  3. Write the results to the "curated" layer for the next_partition_date (e.g., {file_system}/curated/supplier/20241117).

However, I am confused about:

Questions

  1. How do I configure the CDC-enabled source transformation in the Data Flow to handle the dynamic dataset parameters (current_partition_date and next_partition_date) for file paths?

  2. After the first run, where no baseline file exists, how should the CDC handle the absence of a previous partition (i.e., treat all records as inserts)?

  3. Is there a better approach to achieve this entire workflow compared to what I’ve described?

Any guidance or best practices would be greatly appreciated. Thank you in advance!


Solution

  • As per the documentation,

    Change Data Capture (CDC) will copy all the source data to target in the first run and from next run it will only compare the source data rows (already processed from the source) with the current source data rows and will give the modified rows. It won't compare the source data rows with the target data rows.

    So, you can't compare the source file and target from the previous run in this case. Even, if you want to try CDC on current and previous source files by applying the wild card file path, the CDC on files will always give all the rows from the newly loaded or newly modified file. AFAIK, The CDC on files will work through the last modified time of the files but it won't check the rows in between the files.

    As your requirement is to compare the previous target file and current source file, you can try the below workaround. But this needs a certain key column value for all your rows.

    For the first time, you need to copy all the source data to the target file. From the second pipeline run (after one target file creation), you can follow the below approach.

    Here, you need to consider 3 types of rows.

    1. Rows that are updated in the new file when compared to the previous target rows.
    2. New rows in the new file.
    3. Deleted rows from the last target file.

    Take 2 sources in the dataflow. Give your parameterize source dataset for one source and parameterized target dataset as another source. If there are multiple files in your source folder {file_system}/raw/supplier/20241117 and you want to get latest files data, you can apply CDC here. Import the projection on both sources by giving any sample values to the dataset parameters.

    1. Updated rows:

      For the updated rows, use the Exists transformation with the Source1 as left and Source2 as right streams and select the exists option. Give the key column from both left and right streams in the exists condition.

      enter image description here

      This will give the common rows and updated values if any.

    2. New rows:

      For the new rows, take another exists transformation with source1 and source2 as left and right streams and select Doesn't exists option and give the key column names as same as above.

      enter image description here

    3. For deleted rows:

      For the deleted rows, reverse the left and right streams in the 1st step.

      enter image description here

    In this way, you can identify the inserts, updates, deletes by comparing the current source and previous target files data. You can apply your logic upon these transformation as per your requirement and add your target dataset as sink.

    In the dataflow activity, pass the values for the dataset parameters.

    For the above sample, I have used the below values.

    enter image description here

    You need to pass your expression to these values.