I have a task to copy data from one schema to another one of the same DB in PostgreSQL:
Insert :
Load table incrementally using project_id as a lookup. Insert project record into this table when project_id does not exist.
Update :
If project_id already exists and the value of other attributes is changed, then update only the changed values for the collect project (project_id).
Something I do not quite understand what the request should be. My basic one is like this:
INSERT INTO target_schema.supplies select * from source_schema.supplies
but there is a mistake:
2021-08-10 13:05:23,482] {{taskinstance.py:1128}} ERROR - duplicate key value violates unique constraint "supplies_pkey"
DETAIL: Key (id)=(2) already exists.
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 113, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 118, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/usr/local/airflow/dags/local_copy_source_schema_to_target_schema.py", line 31, in copy_data_from_source_schema_to_target_schema
redshift_hook.run(sql_queries)
File "/usr/local/lib/python3.7/site-packages/airflow/hooks/dbapi_hook.py", line 175, in run
cur.execute(s)
psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "supplies_pkey"
DETAIL: Key (id)=(2) already exists.
And other conditions from task not fulfilled.