0

I'd like to use the ON DUPLICATE KEY UPDATE optionality provided by SQLAlchemy to upsert a bunch of records.

These records have been sucessfully inserted with python using the following (where connection is engine.connect() object and table is a Table object)

record_list = [{'col1': 'name1', 'col2': '2015-01-31', 'col3': 27.2},
               {'col1': 'name1', 'col2': '2016-01-31', 'col3': 25.2}]
query = insert(table)
results = connection.execute(query, record_list)

Looking at the docs at https://docs.sqlalchemy.org/en/13/dialects/mysql.html#insert-on-duplicate-key-update-upsert as well as a number of SO questions (including the suggestion it's possible under the comments on SQLAlchemy ON DUPLICATE KEY UPDATE ) I've tried a number of different examples, but there were none that I could see that address multiple records with the upsert statement using this method.

I'm trying along the lines of

query = insert(table).values(record_list)
upsert_query = query.on_duplicate_key_update()
results = connection.execute(upsert_query)

but either get the issue that the .on_duplicate_key_update() requires cant be empty or that the SQL syntax is wrong.

If anyone has sucessfully managed and could help me with the code structure here I'd really appreciate it.

kowpow
  • 95
  • 2
  • 8

4 Answers4

9

I just ran into a similar problem and creating a dictionary out of query.inserted solved it for me.

query = insert(table).values(record_list)
update_dict = {x.name: x for x in query.inserted}
upsert_query = query.on_duplicate_key_update(update_dict)
a5r0n
  • 83
  • 5
user12730260
  • 106
  • 2
1

@user12730260’s answer is great! but has a little bug, the correct code is:

query = insert(table).values(record_list)   # each record is a dict
update_dict = {x.name: x for x in query.inserted}  # specify columns for update, u can filter some of it
upsert_query = query.on_duplicate_key_update(**update_dict) # here's the modification: u should expand the columns dict
songofhawk
  • 91
  • 7
1

Thanks to Federico Caselli of the SQLAlchemy project for explaining how to use on_duplicate_key_update in a discussion https://github.com/sqlalchemy/sqlalchemy/discussions/9328

Here's a Python3 script that demonstrates how to use SQLAlchemy version 2 to implement upsert using on_duplicate_key_update in the MySQL dialect:

import sqlalchemy as db
import sqlalchemy.dialects.mysql as mysql
from sqlalchemy import delete, select, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "foo"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(30))


engine = db.create_engine('mysql+mysqlconnector://USER-NAME-HERE:PASS-WORD-HERE@localhost/SCHEMA-NAME-HERE')
conn = engine.connect()

# setup step 0 - ensure the table exists
Base().metadata.create_all(bind=engine)

# setup step 1 - clean out rows with id 1..5
del_stmt = delete(User).where(User.id.in_([1, 2, 3, 4, 5]))
conn.execute(del_stmt)
conn.commit()
sel_stmt = select(User)
users = list(conn.execute(sel_stmt))
print(f'Table size after cleanout: {len(users)}')

# setup step 2 - insert 4 rows
ins_stmt = mysql.insert(User).values(
    [
        {"id": 1, "name": "x"},
        {"id": 2, "name": "y"},
        {"id": 3, "name": "w"},
        {"id": 4, "name": "z"},
    ]
)
conn.execute(ins_stmt)
conn.commit()
users = list(conn.execute(sel_stmt))
print(f'Table size after insert: {len(users)}')

# demonstrate upsert
ups_stmt = mysql.insert(User).values(
    [
        {"id": 1, "name": "xx"},
        {"id": 2, "name": "yy"},
        {"id": 3, "name": "ww"},
        {"id": 5, "name": "new"},
    ]
)
ups_stmt = ups_stmt.on_duplicate_key_update(name=ups_stmt.inserted.name)
# if you want to see the compiled result
# x = ups_stmt.compile(dialect=mysql.dialect())
# print(x.string, x.construct_params())
conn.execute(ups_stmt)
conn.commit()

users = list(conn.execute(sel_stmt))
print(f'Table size after upsert: {len(users)}')
chrisinmtown
  • 3,571
  • 3
  • 34
  • 43
0

Your on_duplicate_key_update function requires arguments that define the data to be inserted in the update. Please have a look at the example in the documentation that you have already found.

insert().on_duplicate_key_update({"key": "value"})
julian
  • 451
  • 2
  • 8
  • Thanks for your reply. I tried passing in the record_list again (which is a list of dictionaries) but this didnt work - I think as a dictionary is expected. Any idea what would be required here where multiple records are being updated? Thx – kowpow Dec 11 '19 at 18:12
  • just to clarify the above, I tried insert.on_duplicate_key_update(record_list) – kowpow Dec 11 '19 at 18:23
  • The documentation gives an example of multiple updates, using a dict as well as a list: `on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update( {"data": "some data", "updated_at": func.current_timestamp()}, )` or `on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update( [ ("data", "some data"), ("updated_at", func.current_timestamp()), ], )` – julian Dec 11 '19 at 18:24
  • record_list is a list with a single dict as it's only entry, not a dict. you can inspect `type(record_list)` – julian Dec 11 '19 at 18:26
  • Perhaps I'm misunderstanding, but I see those examples in the docs as updating a single record where multiple columns are identified by the dict key and the record/row is the dict value. In terms of my variable, record_list, it's a list of 2 dictionaries, each of which represents a different record - I'm actually trying to upsert a lot more, but for clarity just limited to 2 in my example. – kowpow Dec 11 '19 at 18:58
  • 1
    @kowpow Did you manage to use `on_duplicate_key_update` with a list of dictionaries? – clumdee Jun 09 '22 at 05:11
  • @clumdee sorry for the delay in coming back. Since asking the original question over two years ago I've gradually moved away from sqlalchemy to using either managing directly through sql strings with an adapter (eg psycopg2 for postgres) or using a third party library like awswrangler. – kowpow Oct 14 '22 at 11:52
  • @kowpow No worries. Thank you for checking back and for sharing an alternative option. – clumdee Oct 16 '22 at 13:42
  • 1
    @clumdee I added an answer above that demonstrates `on_duplicate_key_update` using a MySQL server via the SQLAlchemy MySQL dialect, please see https://stackoverflow.com/a/75538576/1630244 – chrisinmtown Feb 22 '23 at 22:14