6

I have a pandas DataFrame that I need to store into the database. Here's my current line of code for inserting:

df.to_sql(table,con=engine,if_exists='append',index_label=index_col)

This works fine if none of the rows in df exist in my table. If a row already exists, I get this error:

sqlalchemy.exc.IntegrityError: (psycopg2.IntegrityError) duplicate key
value violates unique constraint "mypk"
DETAIL:  Key (id)=(42) already exists.
 [SQL: 'INSERT INTO mytable (id, owner,...) VALUES (%(id)s, %(owner)s,...']
 [parameters:...] (Background on this error at: http://sqlalche.me/e/gkpj)

and nothing is inserted.

PostgreSQL has optional ON CONFLICT clause, which could be used to UPDATE the existing table rows. I read entire pandas.DataFrame.to_sql manual page and I couldn't find any way to use ON CONFLICT within DataFrame.to_sql() function.

I have considered spliting my DataFrame in two based on what's already in the db table. So now I have two DataFrames, insert_rows and update_rows, and I can safely execute

insert_rows.to_sql(table, con=engine, if_exists='append', index_label=index_col)

But then, there seems to be no UPDATE equivalent to DataFrame.to_sql(). So how do I update the table using DataFrame update_rows?

Granny Aching
  • 1,295
  • 12
  • 37

5 Answers5

17

I know it's an old thread, but I ran into the same issue and this thread showed up in Google. None of the answers is really satisfying yet, so I here's what I came up with:

My solution is pretty similar to zdgriffith's answer, but much more performant as there's no need to iterate over data_iter:

def postgres_upsert(table, conn, keys, data_iter):
    from sqlalchemy.dialects.postgresql import insert

    data = [dict(zip(keys, row)) for row in data_iter]

    insert_statement = insert(table.table).values(data)
    upsert_statement = insert_statement.on_conflict_do_update(
        constraint=f"{table.table.name}_pkey",
        set_={c.key: c for c in insert_statement.excluded},
    )
    conn.execute(upsert_statement)

Now you can use this custom upsert method in pandas' to_sql method like zdgriffith showed.

Please note that my upsert function uses the primary key constraint of the table. You can target another constraint by changing the constraint argument of .on_conflict_do_update.

This SO answer on a related thread explains the use of .excluded a bit more: https://stackoverflow.com/a/51935542/7066758

SaturnFromTitan
  • 1,334
  • 14
  • 20
11

@ SaturnFromTitan, thanks for the reply to this old thread. That worked like magic. I would upvote, but I don't have the rep.

For those that are as new to all this as I am: You can cut and paste SaturnFromTitan answer and call it with something like:

    df.to_sql('my_table_name', 
              dbConnection,schema='my_schema',
              if_exists='append',
              index=False,
              method=postgres_upsert)  

And that's it. The upsert works.

Rivet174
  • 111
  • 1
  • 4
4

To follow up on Brendan's answer with an example, this is what worked for me:

import os
import sqlalchemy as sa
import pandas as pd
from sqlalchemy.dialects.postgresql import insert


engine = sa.create_engine(os.getenv("DBURL"))
meta = sa.MetaData()
meta.bind = engine
meta.reflect(views=True)


def upsert(table, conn, keys, data_iter):
    upsert_args = {"constraint": "test_table_col_a_col_b_key"}
    for data in data_iter:
        data = {k: data[i] for i, k in enumerate(keys)}
        upsert_args["set_"] = data
        insert_stmt = insert(meta.tables[table.name]).values(**data)
        upsert_stmt = insert_stmt.on_conflict_do_update(**upsert_args)
        conn.execute(upsert_stmt)


if __name__ == "__main__":
    df = pd.read_csv("test_data.txt")
    with db.engine.connect() as conn:
        df.to_sql(
            "test_table",
            con=conn,
            if_exists="append",
            method=upsert,
            index=False,
        )

where in this example the schema would be something like:

CREATE TABLE test_table(
    col_a text NOT NULL,
    col_b text NOT NULL,
    col_c text,
    UNIQUE (col_a, col_b)
)
zdgriffith
  • 164
  • 5
3

If you notice in the to_sql docs there's mention of a method argument that takes a callable. Creating this callable should allow you to use the Postgres clauses you need. Here's an example of a callable they mentioned in the docs: https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-sql-method

It's pretty different from what you need, but follow the arguments passed to this callable. They will allow you to construct a regular SQL statement.

Brendan Martin
  • 561
  • 6
  • 17
1

If anybody wanted to build on top of the answer from zdgriffith and dynamically generate the table constraint name you can use the following query for postgreSQL:

select distinct tco.constraint_name
from information_schema.table_constraints tco
         join information_schema.key_column_usage kcu
              on kcu.constraint_name = tco.constraint_name
                  and kcu.constraint_schema = tco.constraint_schema
                  and kcu.constraint_name = tco.constraint_name
where kcu.table_name = '{table.name}'
  and constraint_type = 'PRIMARY KEY';

You can then format this string to populate table.name inside the upsert() method.

I also didn't require the meta.bind and meta.reflect() lines. The latter will be deprecated soon anyway.

rup
  • 483
  • 5
  • 15