21

The PostgreSQL ON CONFLICT clause in INSERT statements provides "upsert" functionality (i.e. update an existing record, or insert a new one if no such record exists). This functionality is supported in SQLAlchemy via the on_conflict_do_nothing and on_conflict_do_update methods on the PostgreSQL dialect's Insert object (as described here):

from sqlalchemy.dialects.postgresql import insert

insert_stmt = insert(my_table).values(
    id='some_existing_id',
    data='inserted value'
)

do_nothing_stmt = insert_stmt.on_conflict_do_nothing(
    index_elements=['id']
)

conn.execute(do_nothing_stmt)

do_update_stmt = insert_stmt.on_conflict_do_update(
    constraint='pk_my_table',
    set_=dict(data='updated value')
)

conn.execute(do_update_stmt)

I am using flask_sqlalchemy, which manages SQLAlchemy's engine, session, and connections for you. To add an element to the database, I create an instance of my model, add it to the database session, and then call commit, something like this:

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
db = SQLAlchemy(app)

class MyTable(db.Model):
    id = db.Column(UUID, primary_key=True)
    data = db.Column(db.String)

relation = MyTable(id=1, data='foo')
db.session.add(relation)
db.session.commit()

So the Insert object is completely wrapped and obscured by flask_sqlalchemy.

How can I access the PostgreSQL-specific dialect methods to perform an upsert? Do I need to bypass flask_sqlalchemy and create my own session? If I do this, how can I ensure no conflicts?

charrison
  • 806
  • 1
  • 7
  • 14

2 Answers2

26

It turns out you can execute a lower-level statement on the db.session. So a solution looks something like this:

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.dialects.postgresql import insert as pg_insert

app = Flask(__name__)
db = SQLAlchemy(app)

class MyTable(db.Model):
    id = db.Column(UUID, primary_key=True)
    data = db.Column(db.String)

    def __init__(self, _id, *args, **kwargs):
        self.id = _id
        self.data = kwargs['data']

    def as_dict(self):
        return {'id': self.id, 'data': self.data}

    def props_dict(self):
        d = self.as_dict()
        d.pop('id')
        return d

relation = MyTable(id=1, data='foo')
statement = pg_insert(MyTable)\.
    values(**relation.as_dict()).\
    on_conflict_do_update(constraint='id',
                          set_=relation.props_dict())

db.session.execute(statement)
db.session.commit()

The as_dict() and props_dict() methods in my model class allow me to use the constructor to filter out unwanted properties from the incoming HTTP request.

charrison
  • 806
  • 1
  • 7
  • 14
  • 1
    This fails for me with the error that `constraint "id" for table "foobar" does not exist`, even though `id` is created as a `db.Column(db.Integer, primary_key=True)` field. – deed02392 Sep 05 '19 at 23:35
  • This helped me a lot. Worked well for my need to run on_conflict_do_nothing() with some small changes to your code example. – David Wickstrom Aug 28 '20 at 07:41
  • @deed02392 I'm having the same error, did you find a solution? – avocado Jun 20 '21 at 15:33
  • Hi @avocado, I must have done in the end, but I guess it was something obvious hence I didn't update here again. We can have a chat if you can't figure it out. – deed02392 Jun 21 '21 at 15:12
2

An alternative approach using compilation extension (https://docs.sqlalchemy.org/en/13/core/compiler.html):

from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert

@compiles(Insert)
def compile_upsert(insert_stmt, compiler, **kwargs):
    """
    converts every SQL insert to an upsert  i.e;
    INSERT INTO test (foo, bar) VALUES (1, 'a')
    becomes:
    INSERT INTO test (foo, bar) VALUES (1, 'a') ON CONFLICT(foo) DO UPDATE SET (bar = EXCLUDED.bar)
    (assuming foo is a primary key)
    :param insert_stmt: Original insert statement
    :param compiler: SQL Compiler
    :param kwargs: optional arguments
    :return: upsert statement
    """
    pk = insert_stmt.table.primary_key
    insert = compiler.visit_insert(insert_stmt, **kwargs)
    ondup = f'ON CONFLICT ({",".join(c.name for c in pk)}) DO UPDATE SET'
    updates = ', '.join(f"{c.name}=EXCLUDED.{c.name}" for c in insert_stmt.table.columns)
    upsert = ' '.join((insert, ondup, updates))
    return upsert

This should ensure that all insert statements behave as upserts.

danielcahall
  • 2,672
  • 8
  • 14