I'm trying to modify pandas insertion method using COPY. The purpose is to implement an "upsert" mechanism for Postgres database.
I'm using this SO answer for creating temp table and copying data into it, then inserting into target table.
The following code is working, but I had to set primary_key to my real table PK explicitly. The question is, can I get PK from visible in this scope variables?
import csv
from io import StringIO
from typing import Iterable
from sqlalchemy.engine.base import Connection
from pandas.io.sql import SQLTable
# Alternative to_sql() *method* for DBs that support COPY FROM
# https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-sql-method
def psql_upsert_copy(table: SQLTable, conn: Connection, keys: Iterable, data_iter: Iterable[tuple]):
# gets a DBAPI connection that can provide a cursor
dbapi_conn = conn.connection
with dbapi_conn.cursor() as cur:
s_buf = StringIO()
writer = csv.writer(s_buf)
writer.writerows(data_iter)
s_buf.seek(0)
columns = ', '.join(f'"{k}"' for k in keys)
excluded_columns = ', '.join(f'EXCLUDED."{k}"' for k in keys)
# is it possible to get it from the table?
primary_key = ', '.join(['"PK_col_a"', '"PK_col_b"'])
if table.schema:
table_name = f'{table.schema}.{table.name}'
else:
table_name = table.name
sql = f'''
CREATE TEMP TABLE tmp_table
ON COMMIT DROP
AS SELECT * FROM {table_name}
WITH NO DATA;
COPY tmp_table ({columns}) FROM STDIN WITH CSV;
INSERT INTO {table_name}
SELECT *
FROM tmp_table
ON CONFLICT ({primary_key}) DO UPDATE
SET ({columns}) = ({excluded_columns});
'''
cur.copy_expert(sql=sql, file=s_buf)
P.S. Usage is like so:
df.to_sql(name='orinal_table_name', con=some_psql_db_engine, if_exists='append', index=False, method=psql_upsert_copy)