I have coded an airbyte custom destination using python. I have implemented an incremental sync dedup operation using this query,
MERGE INTO {self.schema_name}.{table_name} AS target
USING {self.schema_name}.{table_name}_temp AS source
ON target.data_{primary_keys[0][0]}=source.data_{primary_keys[0][0]}
WHEN MATCHED THEN
{query_placeholder_refined}
WHEN NOT MATCHED THEN
INSERT *
here the query_placeholder_refined
variable is replaced with an UPDATE SET
query statement where all the columns of the target table are updated respectively, take for instance a simplified version of the query could be,
MERGE INTO integration.issues as target
USING integration.issues_temp as source
ON target.data_id=source.data_id
WHEN MATCHED THEN
UPDATE SET target.data_issue_url=source.data_issue_url, target.data_user_id=source.data_user_id
WHEN NOT MATCHED THEN
INSERT *
the query runs perfectly for a few streams but for other streams it gives this error,
pyspark.sql.utils.AnalysisException: Updates are in conflict for these columns: data_user_id
I found the solution, the query is perfect and works like a charm the problem was that I wasn't giving the correct columns in the UPDATE SET
line of the query which is why SQL was throwing the aforementioned error, the error basically translates to, it was unable to find the given column name in the table.
Change the column names with UPDATE SET *
Giving the correct column names to the query.