I am trying to insert the data incrementally from snowflake table to azure databricks delta lake table.
Existing table data
id | name | salary | dept | address | createdate | updateddate | enddate |
---|---|---|---|---|---|---|---|
1 | aaa | 5000 | pharma | usa | 08-23-2022 | 01-01-9999 | 01-01-9999 |
2 | bbb | 6000 | mechanical | uk | 08-23-2022 | 01-01-9999 | 01-01-9999 |
3 | ccc | 7000 | pharma | singpare | 08-23-2022 | 01-01-9999 | 01-01-9999 |
New data added to the table
id | name | salary | dept | address | createdate | updateddate | enddate |
---|---|---|---|---|---|---|---|
1 | aa | 5000 | pharma | germany | 08-24-2022 | 01-01-9999 | 01-01-9999 |
4 | fff | 8000 | IT | finland | 08-24-2022 | 01-01-9999 | 01-01-9999 |
Expected data in table
id | name | salary | dept | address | createdate | updateddate | enddate |
---|---|---|---|---|---|---|---|
1 | aaa | 5000 | pharma | usa | 08-23-2022 | 01-01-9999 | 01-01-9999 |
2 | bbb | 6000 | mechanical | uk | 08-23-2022 | 01-01-9999 | 01-01-9999 |
3 | ccc | 7000 | pharma | singpare | 08-23-2022 | 01-01-9999 | 01-01-9999 |
1 | aa | 5000 | pharma | germany | 08-24-2022 | 01-01-9999 | 01-01-9999 |
4 | fff | 8000 | IT | finland | 08-24-2022 | 01-01-9999 | 01-01-9999 |
Try with a MERGE statement, this will update your matched rows and insert the unmatched. If your data is stored in memory as dataframe, then create a temporary view using:
snowflakedf.createOrReplaceTempView("<ALIAS>")
Then, in case your table primary key is the id and the name:
MERGE INTO <EXISTING_TABLE_NAME> AS T1 USING
(
SELECT
id,
name,
salary,
dept,
adress,
createdate,
date_format(from_utc_timestamp(current_timestamp(), "CET"),'yyyy-MM-dd') as updateddate,
date_format(from_utc_timestamp(current_timestamp(), "CET"),'yyyy-MM-dd') as enddate
FROM <ALIAS>
) AS TMP
ON T1.id= TMP.id AND T1.name = TMP.name
WHEN MATCHED THEN UPDATE SET
T1.createdate= TMP.createdate,
T1.updateddate= TMP.updateddate,
T1.enddate= TMP.enddate
WHEN NOT MATCHED THEN
INSERT *
For more information about how does it work check:
https://docs.databricks.com/spark/latest/spark-sql/language-manual/delta-merge-into.html
If an insert better fits your problem try this:
INSERT INTO <EXISTING_TABLE_NAME>
SELECT
id,
name,
salary,
dept,
adress,
createdate,
date_format(from_utc_timestamp(current_timestamp(), "CET"),'yyyy-MM-dd') as updateddate,
date_format(from_utc_timestamp(current_timestamp(), "CET"),'yyyy-MM-dd') as enddate
FROM <ALIAS>