From the postgresql documentation on insert:
On successful completion, an INSERT command returns a command tag of the form
INSERT oid count
The count is the number of rows inserted or updated. If count is exactly one, and the target table has OIDs, then oid is the OID assigned to the inserted row. The single row must have been inserted rather than updated. Otherwise oid is zero.
Thus it is possible to check for updates vs inserts by examining the query message
A table can be created with OIDs
using syntax
CREATE TABLE mytable (...) WITH OIDS
, or OIDs
may be enabled on an existing table with syntax
ALTER TABLE mytable SET WITH OIDS
Using sqlalchemy, a table may be created using OIDs
as follows:
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
engine = sa.create_engine('postgresql+psycopg2://user:pass@hostname:port/db')
class Person(Base):
__tablename__ = 'people'
__table_args__ = {'postgresql_with_oids': True}
email = sa.Column(sa.Text, primary_key=True)
name = sa.Column(sa.Text, nullable=False)
Base.metadata.create_all(engine)
And the insert on conflict statement may be constructed like this:
p = {'email': 'hal@hal.hal', 'name': 'Hally hal'}
stmt = pg_insert(Person).values(p)
stmt = stmt.on_conflict_do_update(
index_elements = [Person.email],
set_ = {'name': stmt.excluded.name}
)
finally, once the statement is executed, a result proxy is returned, which has a property lastrowid
that corresponds to the oid
in the query message INSERT oid count
Thus, if you execute stmt
the first time
r = engine.execute(stmt)
r.lastrowid
will output an integer > 0
since a new row is created
Every other time, r.lastrowid
will output 0
.
If you need to track upsert of multiple rows at a time, you can set up extra columns as flags that are updated from the on conflict do update portion of your insert statement.
There are many ways to do this, depending on the exact requirements. Here's is 1 alternative.
add an extra column conflict_updated_at = sa.Column(sa.Datetime(True))
change the upsert defintion to
stmt = pg_insert(Person).values(p)
stmt = stmt.on_conflict_do_update(
index_elements = [Person.email],
set_ = {'name': stmt.excluded.name,
'conflict_updated_at': sa.func.now()}
)