Is there an elegant way to do an INSERT ... ON DUPLICATE KEY UPDATE
in SQLAlchemy? I mean something with a syntax similar to inserter.insert().execute(list_of_dictionaries)
?

- 2,405
- 3
- 22
- 23
11 Answers
ON DUPLICATE KEY UPDATE
post version-1.2 for MySQL
This functionality is now built into SQLAlchemy for MySQL only. somada141's answer below has the best solution: https://stackoverflow.com/a/48373874/319066
ON DUPLICATE KEY UPDATE
in the SQL statement
If you want the generated SQL to actually include ON DUPLICATE KEY UPDATE
, the simplest way involves using a @compiles
decorator.
The code (linked from a good thread on the subject on reddit) for an example can be found on github:
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert
@compiles(Insert)
def append_string(insert, compiler, **kw):
s = compiler.visit_insert(insert, **kw)
if 'append_string' in insert.kwargs:
return s + " " + insert.kwargs['append_string']
return s
my_connection.execute(my_table.insert(append_string = 'ON DUPLICATE KEY UPDATE foo=foo'), my_values)
But note that in this approach, you have to manually create the append_string. You could probably change the append_string function so that it automatically changes the insert string into an insert with 'ON DUPLICATE KEY UPDATE' string, but I'm not going to do that here due to laziness.
ON DUPLICATE KEY UPDATE
functionality within the ORM
SQLAlchemy does not provide an interface to ON DUPLICATE KEY UPDATE
or MERGE
or any other similar functionality in its ORM layer. Nevertheless, it has the session.merge()
function that can replicate the functionality only if the key in question is a primary key.
session.merge(ModelObject)
first checks if a row with the same primary key value exists by sending a SELECT
query (or by looking it up locally). If it does, it sets a flag somewhere indicating that ModelObject is in the database already, and that SQLAlchemy should use an UPDATE
query. Note that merge is quite a bit more complicated than this, but it replicates the functionality well with primary keys.
But what if you want ON DUPLICATE KEY UPDATE
functionality with a non-primary key (for example, another unique key)? Unfortunately, SQLAlchemy doesn't have any such function. Instead, you have to create something that resembles Django's get_or_create()
. Another StackOverflow answer covers it, and I'll just paste a modified, working version of it here for convenience.
def get_or_create(session, model, defaults=None, **kwargs):
instance = session.query(model).filter_by(**kwargs).first()
if instance:
return instance
else:
params = dict((k, v) for k, v in kwargs.iteritems() if not isinstance(v, ClauseElement))
if defaults:
params.update(defaults)
instance = model(**params)
return instance

- 2,326
- 21
- 32
-
1Note that the `append_string` code is non-functional on postgres (with it's new `ON CONFLICT [IGNORE|UPDATE]` feature in 9.5, as the ORM automatically appends a `RETURNING {primary key}` to inserts, and that results in invalid SQL. – Fake Name Aug 10 '15 at 03:03
-
what is the `foo=foo` part doing here, and what would I replace `foo` with in my own table? – nhinkle Jan 07 '17 at 00:53
-
`append_string` not work get `SAWarning: Can't validate argument 'append_string'; can't locate any SQLAlchemy dialect named 'append' % (k, dialect_name)` – wyx May 27 '19 at 14:33
-
1Note that the get_or_create example is exposed to race conditions on concurrent systems. Instead you should try to insert first, catch an exception on key duplication and query the result. – Korenz Feb 15 '21 at 14:27
I should mention that ever since the v1.2 release, the SQLAlchemy 'core' has a solution to the above with that's built in and can be seen under here (copied snippet below):
from sqlalchemy.dialects.mysql import insert
insert_stmt = insert(my_table).values(
id='some_existing_id',
data='inserted value')
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
data=insert_stmt.inserted.data,
status='U'
)
conn.execute(on_duplicate_key_stmt)

- 1,274
- 2
- 18
- 25
-
-
1yeah I should've clarified. The above only works for MySQL *but* Postgres for example has had such functionality for some time now with http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html#sqlalchemy.dialects.postgresql.dml.Insert.on_conflict_do_update and http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html#sqlalchemy.dialects.postgresql.dml.Insert.on_conflict_do_nothing – somada141 Jun 04 '18 at 21:00
-
5This also works for arrays of values, if anyone ever needs that functionality. That means that `values` also accepts `list`s of `dict` objects. – sheba Nov 28 '18 at 14:47
-
May you please give a solid example of how `data` is populated? Is it like `data={'field_1'='value1'}`. Thanks – Houman Sep 04 '19 at 17:57
-
@Houman yeah that's how you define the values and then you use `mysql_noop_upsert_columns=[OrmClass.column]` to define the update columns. – somada141 Sep 05 '19 at 07:13
-
2does this works for bulk upserst too? because I have not managed to get it to work – Kailegh Feb 11 '20 at 11:43
-
@Kailegh the above example is an UPSERT (since it INSERTs and UPDATEs on a dupe). Did you mean something else? – somada141 Feb 11 '20 at 18:03
-
yeah, I mean does this work on a list of rows at the same time? o should I do it one by one? using in values an array of dicts, I have not found it in their documentation – Kailegh Feb 11 '20 at 18:54
-
@Kailegh yes it does. The `values` argument can be a `list` of `dict` with the field names you want inserted and then you can use the `mysql_upsert_columns` argument to define which columns to be upserted in the event of a dupe. SQLAlchemy will then compile the appropriate query to only update the right columns on the dupes – somada141 Feb 12 '20 at 06:21
-
1I have a unique index and an id primary key which is auto-increment.? In my case the id is keep incinerating how to resolve this issue? – M.Abulsoud Apr 22 '20 at 15:14
-
@somada141 I couldn't get it working with a list of dict. I assume that you'd pass the list of dict to insert statement first. What would be the required arguments to `on_duplicate_key_update` in this case? – atuljangra Sep 02 '21 at 18:17
-
@atuljangra Did you manage to get `on_duplicate_key_update` working with a list of dict? – clumdee Jun 09 '22 at 04:19
-
@somada141 please explain your comment about `mysql_upsert_columns` which is not in your answer. Same for `mysql_noop_upsert_columns`. Thx in adv – chrisinmtown Feb 17 '23 at 21:22
Based on phsource's answer, and for the specific use-case of using MySQL and completely overriding the data for the same key without performing a DELETE
statement, one can use the following @compiles
decorated insert expression:
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert
@compiles(Insert)
def append_string(insert, compiler, **kw):
s = compiler.visit_insert(insert, **kw)
if insert.kwargs.get('on_duplicate_key_update'):
fields = s[s.find("(") + 1:s.find(")")].replace(" ", "").split(",")
generated_directive = ["{0}=VALUES({0})".format(field) for field in fields]
return s + " ON DUPLICATE KEY UPDATE " + ",".join(generated_directive)
return s
-
This example doesn't escape the field values very well. You should probably use the built-in escape methods: https://stackoverflow.com/a/25107658/319066 – phsource Dec 25 '18 at 20:06
-
1@phsource notice that in this example we override fields with their values from the original `INSERT` (referring to the field names and not the values), so no escaping is needed. Obviously, using the now-part-of-the-ORM functionality is better (unless using it with `INSERT FROM SELECT` which does not work as expected) – sheba Dec 28 '18 at 13:27
My way
import typing
from datetime import datetime
from sqlalchemy.dialects import mysql
class MyRepository:
def model(self):
return MySqlAlchemyModel
def upsert(self, data: typing.List[typing.Dict]):
if not data:
return
model = self.model()
if hasattr(model, 'created_at'):
for item in data:
item['created_at'] = datetime.now()
stmt = mysql.insert(getattr(model, '__table__')).values(data)
for_update = []
for k, v in data[0].items():
for_update.append(k)
dup = {k: getattr(stmt.inserted, k) for k in for_update}
stmt = stmt.on_duplicate_key_update(**dup)
self.db.session.execute(stmt)
self.db.session.commit()
Usage:
myrepo.upsert([
{
"field11": "value11",
"field21": "value21",
"field31": "value31",
},
{
"field12": "value12",
"field22": "value22",
"field32": "value32",
},
])

- 9,735
- 7
- 59
- 89
It's depends upon you. If you want to replace then pass OR REPLACE
in prefixes
def bulk_insert(self,objects,table):
#table: Your table class and objects are list of dictionary [{col1:val1, col2:vale}]
for counter,row in enumerate(objects):
inserter = table.__table__.insert(prefixes=['OR IGNORE'], values=row)
try:
self.db.execute(inserter)
except Exception as E:
print E
if counter % 100 == 0:
self.db.commit()
self.db.commit()
Here commit interval can be changed to speed up or speed down

- 2,774
- 20
- 18
The other answers have this covered but figured I'd reference another good example for mysql I found in this gist. This also includes the use of LAST_INSERT_ID
, which may be useful depending on your innodb auto increment settings and whether your table has a unique key. Lifting the code here for easy reference but please give the author a star if you find it useful.
from app import db
from sqlalchemy import func
from sqlalchemy.dialects.mysql import insert
def upsert(model, insert_dict):
"""model can be a db.Model or a table(), insert_dict should contain a primary or unique key."""
inserted = insert(model).values(**insert_dict)
upserted = inserted.on_duplicate_key_update(
id=func.LAST_INSERT_ID(model.id), **{k: inserted.inserted[k]
for k, v in insert_dict.items()})
res = db.engine.execute(upserted)
return res.lastrowid

- 2,298
- 17
- 23
ORM
use upset
func based on on_duplicate_key_update
class Model():
__input_data__ = dict()
def __init__(self, **kwargs) -> None:
self.__input_data__ = kwargs
self.session = Session(engine)
def save(self):
self.session.add(self)
self.session.commit()
def upsert(self, *, ingore_keys = []):
column_keys = self.__table__.columns.keys()
udpate_data = dict()
for key in self.__input_data__.keys():
if key not in column_keys:
continue
else:
udpate_data[key] = self.__input_data__[key]
insert_stmt = insert(self.__table__).values(**udpate_data)
all_ignore_keys = ['id']
if isinstance(ingore_keys, list):
all_ignore_keys =[*all_ignore_keys, *ingore_keys]
else:
all_ignore_keys.append(ingore_keys)
udpate_columns = dict()
for key in self.__input_data__.keys():
if key not in column_keys or key in all_ignore_keys:
continue
else:
udpate_columns[key] = insert_stmt.inserted[key]
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
**udpate_columns
)
# self.session.add(self)
self.session.execute(on_duplicate_key_stmt)
self.session.commit()
class ManagerAssoc(ORM_Base, Model):
def __init__(self, **kwargs):
self.id = idWorker.get_id()
column_keys = self.__table__.columns.keys()
udpate_data = dict()
for key in kwargs.keys():
if key not in column_keys:
continue
else:
udpate_data[key] = kwargs[key]
ORM_Base.__init__(self, **udpate_data)
Model.__init__(self, **kwargs, id = self.id)
....
# you can call it as following:
manager_assoc.upsert()
manager.upsert(ingore_keys = ['manager_id'])

- 41
- 3
-
apologies if this is excessively picky, maybe you'd like to correct `udpate` to `update`? – chrisinmtown Feb 17 '23 at 20:34
Got a simpler solution:
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert
@compiles(Insert)
def replace_string(insert, compiler, **kw):
s = compiler.visit_insert(insert, **kw)
s = s.replace("INSERT INTO", "REPLACE INTO")
return s
my_connection.execute(my_table.insert(replace_string=""), my_values)

- 536
- 3
- 9
-
12Take care. `REPLACE INTO` and `INSERT ... ON DUPLICATE KEY UPDATE` do different things. – Dennis S Hennen Mar 26 '14 at 14:21
-
3Notably, it *deletes* the row, so this solution is usually terribly useless on `InnoDB` (or any other transactional engine) tables, as it chokes on most any `FOREIGN KEY` constraint – Naltharial Mar 31 '14 at 12:07
-
It works fine with MySql. Having said that, I do not have any foreign keys on that table. – algarecu Oct 15 '15 at 09:59
I just used plain sql as:
insert_stmt = "REPLACE INTO tablename (column1, column2) VALUES (:column_1_bind, :columnn_2_bind) "
session.execute(insert_stmt, data)

- 115
- 1
- 3
Update Feb 2023: SQLAlchemy version 2 was recently released and supports on_duplicate_key_update
in the MySQL dialect. Many many thanks to Federico Caselli of the SQLAlchemy project who helped me develop sample code in a discussion at https://github.com/sqlalchemy/sqlalchemy/discussions/9328
Please see https://stackoverflow.com/a/75538576/1630244
If it's ok to post the same answer twice (?) here is my small self-contained code example:
import sqlalchemy as db
import sqlalchemy.dialects.mysql as mysql
from sqlalchemy import delete, select, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "foo"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(30))
engine = db.create_engine('mysql+mysqlconnector://USER-NAME-HERE:PASS-WORD-HERE@localhost/SCHEMA-NAME-HERE')
conn = engine.connect()
# setup step 0 - ensure the table exists
Base().metadata.create_all(bind=engine)
# setup step 1 - clean out rows with id 1..5
del_stmt = delete(User).where(User.id.in_([1, 2, 3, 4, 5]))
conn.execute(del_stmt)
conn.commit()
sel_stmt = select(User)
users = list(conn.execute(sel_stmt))
print(f'Table size after cleanout: {len(users)}')
# setup step 2 - insert 4 rows
ins_stmt = mysql.insert(User).values(
[
{"id": 1, "name": "x"},
{"id": 2, "name": "y"},
{"id": 3, "name": "w"},
{"id": 4, "name": "z"},
]
)
conn.execute(ins_stmt)
conn.commit()
users = list(conn.execute(sel_stmt))
print(f'Table size after insert: {len(users)}')
# demonstrate upsert
ups_stmt = mysql.insert(User).values(
[
{"id": 1, "name": "xx"},
{"id": 2, "name": "yy"},
{"id": 3, "name": "ww"},
{"id": 5, "name": "new"},
]
)
ups_stmt = ups_stmt.on_duplicate_key_update(name=ups_stmt.inserted.name)
# if you want to see the compiled result
# x = ups_stmt.compile(dialect=mysql.dialect())
# print(x.string, x.construct_params())
conn.execute(ups_stmt)
conn.commit()
users = list(conn.execute(sel_stmt))
print(f'Table size after upsert: {len(users)}')

- 3,571
- 3
- 34
- 43
As none of these solutions seem all the elegant. A brute force way is to query to see if the row exists. If it does delete the row and then insert otherwise just insert. Obviously some overhead involved but it does not rely on modifying the raw sql and it works on non orm stuff.

- 205
- 3
- 4
-
17
-
@strangeqargo As someone new to database management I'd love to know more about why that's such a bad idea. – Nick Nov 01 '22 at 00:56
-
1@Nick to "check if row exists" means you can 1) start transaction, try to find a row, end transaction. That's slow. 2) you can forget to wrap a transaction, that's prone to errors (a row you search for might appear while you're searching) the best way is to use database-provided mechanisms - on duplicate key update (IF you have a correct key structure). – strangeqargo Mar 27 '23 at 05:06