apache-sparkpysparkdatabricksdelta

Merge into using pyspark databricks


Can someone please help me with below problem. I need to perform merge operation in databricks, I need help in handling deletes('D'). Here is the update (Source) table.

update_data = [(i, f'updatedFirstName{i}', 50000 + i * 1000) for i in range(1, 10)]
update_columns = ["id", "firstName", "salary"]
df_updates = spark.createDataFrame(update_data, update_columns)

enter image description here

Assume here is my target delta table,

data = [(i, f'firstName{i}', 'I', 40000 + i * 500) for i in range(1, 201)]
columns = ["id", "firstName", "Crud", "salary"]
df_tgt = spark.createDataFrame(data, columns)

enter image description here

I get keys that should be marked as 'D' as dataframe, and I collect using

del_keys = [row.id for row in df_del_keys.collect()]

let's assume, deleted keys with id values are 101 and 102 should be marked as 'D' in target. Deleted keys(101 and 102) won't be available in updates(src) table, for sure.

What I tried?

mergeCondition = " AND ".join([f"tgt.{field} = src.{field}" for field in ["id"]])
updateCondition = f'tgt.{salary} != src.{salary}'
columnsList = ['firstName','salary']
updateValues = {"tgt." + field: col("src." + field) for field in columnsList}
updateValues.update({"tgt.Crud": lit('U')})
del_keys = [row.id for row in df_del_keys.collect()]
(
    deltaTablePeople.alias('tgt')
    .merge(deltaTablePeopleUpdates.alias('src'), mergeCondition)
    .whenMatchedUpdate(
        condition=updateCondition,
        set=updateValues)
    .whenNotMatchedBySourceUpdate(
        condition=f"id IN ({', '.join(map(int, del_keys))})",
        set={"Crud": lit('D')})
 .execute()
)

This is working, but if I get many del_keys(may be in thousands), driver node isn't capable of keeping them. So, I tried as below.

mergeCondition = " AND ".join([f"tgt.{field} = src.{field}" for field in ["id"]])
updateCondition = f'tgt.{salary} != src.{salary}'
columnsList = ['firstName','salary']
updateValues = {"tgt." + field: col("src." + field) for field in columnsList}
updateValues.update({"tgt.Crud": lit('U')})

del_keys=[101,102]
df_del_keys.createOrReplaceTempView("temp_del_keys")

(
    deltaTablePeople.alias('tgt')
    .merge(deltaTablePeopleUpdates.alias('src'), mergeCondition)
    .whenMatchedUpdate(
        condition=updateCondition,
        set=updateValues)
    .whenNotMatchedBySourceUpdate(
        condition="id IN (select id from temp_del_keys)",
        set={"Crud": lit('D')})
 .execute()
)

Looks this isn't supported in databricks. Can some one please help me how to handle deletes(del_keys)?

I expect target table as below after the merge, and I wanted to achieve this using one merge operation.

enter image description here

values for Salary attribute for id 11,12,101, and 102, will be as in target.

Crud for id 11 and 12, won't change, because 11 and 12 isn't available in update (source) table. But Crud for 101, 102 should update as D, because they are to be marked as deleted.


Solution

  • Since you have 2 input reference tables, you need to do some kind of union to use the merge statement if you want to avoid using 2 Merge statements.

    First create the target tables and the 2 input views. from pyspark.sql.types import IntegerType

    update_data = [(i, f'updatedFirstName{i}', 50000 + i * 1000) for i in range(1, 7)]
    update_columns = ["id", "firstName", "salary"]
    df_updates = spark.createDataFrame(update_data, update_columns)
    df_updates.createOrReplaceTempView("vw_update")
    
    
    data = [(i, f'firstName{i}', 'I', 40000 + i * 500) for i in range(1, 12)]
    columns = ["id", "firstName", "Crud", "salary"]
    df_tgt = spark.createDataFrame(data, columns)
    df_tgt.write.format("delta").saveAsTable("target_tbl")
    
    df_del_keys = spark.createDataFrame([10,11], IntegerType())
    df_del_keys.show(truncate=False)
    df_del_keys.createOrReplaceTempView("vw_del_keys")
    

    Now use 1 merge statement. I will use databricks SQL but you can convert this to normal pyspark using

    spark.sql("<databricks sql statement?")
    

    Note the union i've done between the vw_update and vw_del_keys. The when not matched condition is present incase you would like to insert new entries via the vw_update view itself. If not, you can remove it however there is no harm in keeping it.

    %sql
    MERGE INTO target_tbl
    USING (SELECT * from vw_update UNION SELECT value as id, null as firstName, -1 as salary FROM vw_del_keys) as A
    ON (A.id = target_tbl.id)
    WHEN MATCHED AND A.salary > -1 THEN
      UPDATE SET
        crud = "U",
        salary = A.salary,
        firstName = A.firstName
    WHEN MATCHED AND A.salary = -1 THEN
      UPDATE SET
        crud = "D"
    WHEN NOT MATCHED
      THEN INSERT (
        id,
        firstName,
        salary,
        crud
      )
      VALUES (
        A.id,
        A.firstName,
        A.salary,
        "I"
      );
    
    SELECT * FROM target_tbl;
    

    Your target table will now be.

    enter image description here