3

I'm trying to modify pandas insertion method using COPY. The purpose is to implement an "upsert" mechanism for Postgres database.

I'm using this SO answer for creating temp table and copying data into it, then inserting into target table.

The following code is working, but I had to set primary_key to my real table PK explicitly. The question is, can I get PK from visible in this scope variables?

import csv
from io import StringIO
from typing import Iterable

from sqlalchemy.engine.base import Connection
from pandas.io.sql import SQLTable


# Alternative to_sql() *method* for DBs that support COPY FROM
# https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-sql-method
def psql_upsert_copy(table: SQLTable, conn: Connection, keys: Iterable, data_iter: Iterable[tuple]):
    # gets a DBAPI connection that can provide a cursor
    dbapi_conn = conn.connection
    with dbapi_conn.cursor() as cur:
        s_buf = StringIO()
        writer = csv.writer(s_buf)
        writer.writerows(data_iter)
        s_buf.seek(0)
        columns = ', '.join(f'"{k}"' for k in keys)
        excluded_columns = ', '.join(f'EXCLUDED."{k}"' for k in keys)

        # is it possible to get it from the table?
        primary_key = ', '.join(['"PK_col_a"', '"PK_col_b"'])

        if table.schema:
            table_name = f'{table.schema}.{table.name}'
        else:
            table_name = table.name
        sql = f'''
        CREATE TEMP TABLE tmp_table
        ON COMMIT DROP
        AS SELECT * FROM {table_name}
        WITH NO DATA;

        COPY tmp_table ({columns}) FROM STDIN WITH CSV;

        INSERT INTO {table_name}
        SELECT *
        FROM tmp_table
        ON CONFLICT ({primary_key}) DO UPDATE
        SET ({columns}) = ({excluded_columns});
        '''
        cur.copy_expert(sql=sql, file=s_buf)

P.S. Usage is like so:

df.to_sql(name='orinal_table_name', con=some_psql_db_engine, if_exists='append', index=False, method=psql_upsert_copy)
  • Are you keen on using the pandas insertion method using COPY? Because SQLalchemy has a build in mechanism for upserting that works quite well. I tried it with to_sql as well in the past and moved to the SQLalchemy because it work a lot easier. – PvG Aug 01 '19 at 10:24
  • @PvGelder not really. It's just that I'm much more familiar with pandas than SQLAlchemy ORM. If you could provide me with some directions on building upsert mechanism using SQLAlchemy, that would be great. – Valery Kustov Aug 01 '19 at 10:32
  • Also it seems to me that it's possible to change signature of `psql_upsert_copy` so that it takes one more argument `primary_key: str` and later in code just use `functools.partial` for filling this argument before passing the function to `df.to_sql()`. But maybe is it already visible inside that scope? – Valery Kustov Aug 01 '19 at 10:40

1 Answers1

0

You can do something like this with SQLalchemy:

from sqlalchemy import MetaData
from sqlalchemy.dialects.postgresql import insert
import psycopg2
import sqlalchemy


def upsert_data(df, url, schema, table, primarykey):
    insrt_vals = df.to_dict(orient='records')
    engine = sqlalchemy.create_engine(url)
    connect = engine.connect()
    meta = MetaData(bind=engine, schema=schema)
    meta.reflect(bind=engine)
    table_used = meta.tables[table]
    insrt_stmnt = insert(table_used).values(insrt_vals)

    update_columns = {col.name: col for col in insrt_stmnt.excluded if col.name not in (primarykey)}
    upsert_stmt = insrt_stmnt.on_conflict_do_update(index_elements=[primarykey], set_=update_columns)

More info on this can be found here: https://docs.sqlalchemy.org/en/13/dialects/postgresql.html

PvG
  • 86
  • 10