pysparkspark-structured-streamingdebeziumdelta-lakeyugabytedb

Delta Lake table update column-wise in parallel


I hope everyone is doing well. I have a long question therefore please bear with me.

Context:
So I have CDC payloads coming from the Debezium connector for Yugabyte in the following form:
r"""
{
"payload": {
"before": null,
"after": {
"id": {
"value": "MK_1",
"set": true
},
"status": {
"value": "new_status",
"set": true
},
"status_metadata": {
"value": "new_status_metadata",
"set": true
},
"creator": {
"value": "new_creator",
"set": true
},
"created": null,
"creator_type": null,
"updater": null,
"updated": null,
"updater_type": {
"value": "new_updater_type",
"set": true
}
},
"source": {
"version": "1.7.0.13-BETA",
"connector": "yugabytedb",
"name": "dbserver1",
"ts_ms": -4258763692835,
"snapshot": "false",
"db": "yugabyte",
"sequence": "[\"0:0::0:0\",\"1:338::0:0\"]",
"schema": "public",
"table": "customer",
"txId": "",
"lsn": "1:338::0:0",
"xmin": null
},
"op": "u",
"ts_ms": 1669795724779,
"transaction": null
}
}
"""
The payload consists of before and after fields. As , visible by the `op:u', this is an update operation. Therefore a row in Yugabyte table called customers with id MK_1 was updated with new values. However, the after field only shows those columns whose value has been updated. Therefore the fields in "after" which are null have not been updated e.g created is null and therefore have not been updated but status is {"value": "new_status", "set": true} which means the status column value has been updated to the new value of "new_status". Now I have PySpark Structured Streaming Pipeline which takes in these payloads, processes them, and then makes a micro-data frame of the following form:
id | set_id | status | set_status | status_metadata | set_status_metadata | creator | set_creator | created | creator_type | set_created_type | updater | set_updater | updated | set_updated | updater_type | set_updater_type
The "set_column" is either true or false depending on the payload.

Problem:
Now I have a delta table on delta lake with the following schema:
id | status | status_metadata | creator | created | created_type | updater | updated | updater_type
And I am using the following code to update the above delta table using the python delta lake API (v 2.2.0):
for column in fields_map:
delta_lake_merger.whenMatchedUpdate(
condition = f"update_table.op = 'u' AND update_table.set_{column} = 'true'"
, set={column : fields_map[column]}
).execute()

Now you might be wondering why I am doing an update column-wise rather than all the columns at once. This is exactly the problem that I am facing. If I update all of the columns at once without set_col = true condition then it will overwrite the entire state of the rows for the matching id in the delta table. This is not what I want.

What do I want?
I only want to update those columns from the payload whose values are not null in the payload. If I update all columns at once like this:
delta_lake_merger.whenMatchedUpdate(
condition = f"update_table.op = 'u'"
, set=fields_map
).execute()

Then delta lake api will also replace those columns which have not been updated with nulls in the delta table since this is the value for the non-updating columns in the cdc package.The above iterative  solution works where I do an update column-wise for all of the rows in the delta table since it just ignores the specific row in the given column whose set_column is False and therefore keeps the existing value on the delta table.

However, this is slow since it writes the data N times in a sequential manner which bottlenecks my streaming query. Since all of the column-wise updates are independent, is there any way in delta lake python API, I can update all of the columns at once but with the set_column condition as well? I know there might be a way because each of these is just a independent call to write data for each column with the given condition. I want to call the execute command at once for all columns with the set_condition rather than putting it in a loop.

PS: I was thinking of using asyncio library for python but not so sure. Thank you so much.


Solution

  • I have been able to find a solution if someone is stuck on a similar problem, you can use a CASE WHEN in the set field of whenMatchedUpdate:

    delta_lake_merger.whenMatchedUpdate(set = "CASE WHEN update_table.set_{column}='true' THEN update_table.{column} ELSE main_table.{column} END")
    

    This will execute the update for all of the columns at once with the set condition.