pysparksnowflake-cloud-data-platformdatabricksazure-databricksdelta-lake

how to insert the new incremental data(keeping old records) into delta lake table in azure databricks from Source snowflake table


I am trying to insert the data incrementally from snowflake table to azure databricks delta lake table.

snowflake table data and expected output in delta lake

Existing table data

id name salary dept address createdate updateddate enddate
1 aaa 5000 pharma usa 08-23-2022 01-01-9999 01-01-9999
2 bbb 6000 mechanical uk 08-23-2022 01-01-9999 01-01-9999
3 ccc 7000 pharma singpare 08-23-2022 01-01-9999 01-01-9999

New data added to the table

id name salary dept address createdate updateddate enddate
1 aa 5000 pharma germany 08-24-2022 01-01-9999 01-01-9999
4 fff 8000 IT finland 08-24-2022 01-01-9999 01-01-9999

Expected data in table

id name salary dept address createdate updateddate enddate
1 aaa 5000 pharma usa 08-23-2022 01-01-9999 01-01-9999
2 bbb 6000 mechanical uk 08-23-2022 01-01-9999 01-01-9999
3 ccc 7000 pharma singpare 08-23-2022 01-01-9999 01-01-9999
1 aa 5000 pharma germany 08-24-2022 01-01-9999 01-01-9999
4 fff 8000 IT finland 08-24-2022 01-01-9999 01-01-9999

Solution

  • Try with a MERGE statement, this will update your matched rows and insert the unmatched. If your data is stored in memory as dataframe, then create a temporary view using:

    snowflakedf.createOrReplaceTempView("<ALIAS>")
    

    Then, in case your table primary key is the id and the name:

    MERGE INTO <EXISTING_TABLE_NAME> AS T1 USING 
        (
            SELECT 
               id,
               name, 
               salary,
               dept,
               adress,
               createdate,
               date_format(from_utc_timestamp(current_timestamp(), "CET"),'yyyy-MM-dd') as updateddate,
               date_format(from_utc_timestamp(current_timestamp(), "CET"),'yyyy-MM-dd') as enddate
            FROM <ALIAS>
        ) AS TMP
    ON T1.id= TMP.id AND T1.name = TMP.name 
    WHEN MATCHED THEN UPDATE SET
      T1.createdate= TMP.createdate,
      T1.updateddate= TMP.updateddate,
      T1.enddate= TMP.enddate
    WHEN NOT MATCHED THEN 
    INSERT *
    

    For more information about how does it work check:

    https://docs.databricks.com/spark/latest/spark-sql/language-manual/delta-merge-into.html

    If an insert better fits your problem try this:

    INSERT INTO <EXISTING_TABLE_NAME> 
            SELECT 
               id,
               name, 
               salary,
               dept,
               adress,
               createdate,
               date_format(from_utc_timestamp(current_timestamp(), "CET"),'yyyy-MM-dd') as updateddate,
               date_format(from_utc_timestamp(current_timestamp(), "CET"),'yyyy-MM-dd') as enddate
            FROM <ALIAS>