1

I have a task to copy data from one schema to another one of the same DB in PostgreSQL:

Insert :

Load table incrementally using project_id as a lookup. Insert project record into this table when project_id does not exist.

Update :

If project_id already exists and the value of other attributes is changed, then update only the changed values for the collect project (project_id).

Something I do not quite understand what the request should be. My basic one is like this:

INSERT INTO target_schema.supplies select * from source_schema.supplies

but there is a mistake:

2021-08-10 13:05:23,482] {{taskinstance.py:1128}} ERROR - duplicate key value violates unique constraint "supplies_pkey"
DETAIL:  Key (id)=(2) already exists.
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 113, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 118, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/usr/local/airflow/dags/local_copy_source_schema_to_target_schema.py", line 31, in copy_data_from_source_schema_to_target_schema
    redshift_hook.run(sql_queries)
  File "/usr/local/lib/python3.7/site-packages/airflow/hooks/dbapi_hook.py", line 175, in run
    cur.execute(s)
psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "supplies_pkey"
DETAIL:  Key (id)=(2) already exists.

And other conditions from task not fulfilled.

1 Answers1

0

You should try ON CONFLICT.

INSERT INTO users (id, level)
VALUES (1, 0)
ON CONFLICT (id) DO UPDATE
SET level = users.level + 1;

Taken from SO

Jayadevan
  • 1,306
  • 2
  • 12
  • 33