I have coded an airbyte custom destination using python. I have implemented an incremental sync dedup operation using this query,
MERGE INTO {self.schema_name}.{table_name} AS target
USING {self.schema_name}.{table_name}_temp AS source
ON target.data_{primary_keys[0][0]}=source.data_{primary_keys[0][0]}
WHEN MATCHED THEN
{query_placeholder_refined}
WHEN NOT MATCHED THEN
INSERT *
here the query_placeholder_refined
variable is replaced with an UPDATE SET
query statement where all the columns of the target table are updated respectively, take for instance a simplified version of the query could be,
MERGE INTO integration.issues as target
USING integration.issues_temp as source
ON target.data_id=source.data_id
WHEN MATCHED THEN
UPDATE SET target.data_issue_url=source.data_issue_url, target.data_user_id=source.data_user_id
WHEN NOT MATCHED THEN
INSERT *
the query runs perfectly for a few streams but for other streams it gives this error,
pyspark.sql.utils.AnalysisException: Updates are in conflict for these columns: data_user_id