azuresnowflake-cloud-data-platformazure-data-factoryincremental-load

ADF to Snowflake incremental load and streams


I am trying to load files from my Azure blob to Snowflake table incrementally. After which in snowflake, I put streams on that table and load the data to the target table.

I am unable to do incremental load from Azure to Snowflake. I have tried many ways but not working. I am attaching the images of my 2 different ways (pipelines) to do the same.

In this pipeline, I just added 3 extra columns which I wanted in my sink

In this pipeline, I tried creating conditional splits

Both of these have not worked out. Kindly suggest me how to go about this.


Solution

  • You can achieve this by selecting Allow Upsert in sink settings under the Update method.

    Below are my repro details:

    1. This is the staging table in snowflake which I am loading incremental data to.

    enter image description here

    1. Source file – Incremental data

    a) This file contains records that exist in the staging table (StateCode = ‘AK’ & ‘CA’), so these 2 records should be updated in the staging table with new values in Flag.

    b) Other 2 records (StateCode = ‘FL’ & ‘AZ’) should be inserted into the staging table.

    enter image description here

    1. Dataflow source settings:

    enter image description here

    enter image description here

    1. Adding DerivedColumn transformation to add a column modified_date which is not in the source file but in the sink table.

    enter image description here

    enter image description here

    1. Adding AlterRow transformation: When you are using the Upsert method, AlterRow transformation is a must to include the upsert condition.

    a) In condition, you can mention to upsert the rows only when the unique column (StateCode in my case) is not null.

    enter image description here

    enter image description here

    1. Adding sink transformation to AlterRow with Snowflake stage table as sink dataset.

    enter image description here

    a) In sink settings, select Update method as Allow upsert and provide the Key (unique) column based on which the Upsert should happen in sink table.

    enter image description here

    enter image description here

    1. After you run the pipeline, you can see the results in a sink.

    a) As StateCode AK & CA already exists in the table, only other column values (flag & modified_date) for those rows are updated.

    b) StateCode AZ & FL are not found in the stage (sink) table so, these rows are inserted.

    enter image description here