I am fairly new to Azure Data Factory (ADF) but have been learning and experimenting with some of its advanced features. I'm currently working on a use case involving Change Data Capture (CDC) and partitioned folders in Azure Data Lake Storage (ADLS). I need some guidance on how to configure this properly.
I am creating a Data Flow in ADF that processes daily partitioned files in ADLS. Here’s how it works:
Source Data:
Located in a "raw" layer, with folders partitioned by date, e.g., {file_system}/raw/supplier/20241116
.
The file within the partition is named data.parquet
.
Transformations:
Destination Data:
{file_system}/curated/supplier/20241116
.I’ve parameterized the datasets used in the Data Flow as follows:
Parameters:
p_DirectoryName
for the base directory path (e.g., /raw/supplier
or /curated/supplier
).
p_partition_folder
for the partition folder name (e.g., 20241116
).
File Path: Constructed dynamically as:
@concat(dataset().DirectoryName, '/', dataset().partition_folder)
The values for p_partition_folder
are derived from a parent pipeline, which includes:
Lookup Activity:
Reads a JSON file that contains:
{ "current_partition_date": "20241116", "next_partition_date": "20241117" }
current_partition_date
refers to the latest partition in the curated layer (the baseline for CDC).
next_partition_date
refers to the partition to which the current run’s transformed data should be written.
Set Variable Activities:
current_partition_date
and next_partition_date
values from the Lookup activity output.Data Flow Activity:
current_partition_date
and next_partition_date
) into the Data Flow for dynamic path resolution.In the Data Flow:
I have a single source transformation where I’ve enabled the CDC (Change Data Capture) option.
The dataset used for the source is parameterized as mentioned above (@concat(dataset().DirectoryName, '/', dataset().partition_folder)
).
I want the Data Flow to:
For the current run:
{file_system}/raw/supplier/20241117/data.parquet
) with the baseline data from the curated layer (e.g., {file_system}/curated/supplier/20241116/data_today.parquet
).Identify changes (inserts, updates, deletes) between these files using CDC.
Write the results to the "curated" layer for the next_partition_date
(e.g., {file_system}/curated/supplier/20241117
).
However, I am confused about:
How the CDC transformation in the source knows to compare the current file with the baseline.
How to ensure the Data Flow dynamically reads the correct baseline file (current_partition_date
) and writes to the correct target folder (next_partition_date
).
How do I configure the CDC-enabled source transformation in the Data Flow to handle the dynamic dataset parameters (current_partition_date
and next_partition_date
) for file paths?
After the first run, where no baseline file exists, how should the CDC handle the absence of a previous partition (i.e., treat all records as inserts)?
Is there a better approach to achieve this entire workflow compared to what I’ve described?
Any guidance or best practices would be greatly appreciated. Thank you in advance!
As per the documentation,
Change Data Capture (CDC) will copy all the source data to target in the first run and from next run it will only compare the source data rows (already processed from the source) with the current source data rows and will give the modified rows. It won't compare the source data rows with the target data rows.
So, you can't compare the source file and target from the previous run in this case. Even, if you want to try CDC on current and previous source files by applying the wild card file path, the CDC on files will always give all the rows from the newly loaded or newly modified file. AFAIK, The CDC on files will work through the last modified time of the files but it won't check the rows in between the files.
As your requirement is to compare the previous target file and current source file, you can try the below workaround. But this needs a certain key column value for all your rows.
For the first time, you need to copy all the source data to the target file. From the second pipeline run (after one target file creation), you can follow the below approach.
Here, you need to consider 3 types of rows.
Take 2 sources in the dataflow. Give your parameterize source dataset for one source and parameterized target dataset as another source. If there are multiple files in your source folder {file_system}/raw/supplier/20241117
and you want to get latest files data, you can apply CDC here. Import the projection on both sources by giving any sample values to the dataset parameters.
Updated rows:
For the updated rows, use the Exists transformation with the Source1 as left and Source2 as right streams and select the exists option. Give the key column from both left and right streams in the exists condition.
This will give the common rows and updated values if any.
New rows:
For the new rows, take another exists transformation with source1 and source2 as left and right streams and select Doesn't exists option and give the key column names as same as above.
For deleted rows:
For the deleted rows, reverse the left and right streams in the 1st step.
In this way, you can identify the inserts, updates, deletes by comparing the current source and previous target files data. You can apply your logic upon these transformation as per your requirement and add your target dataset as sink.
In the dataflow activity, pass the values for the dataset parameters.
For the above sample, I have used the below values.
You need to pass your expression to these values.