Using SQLAlchemy and a MariaDB backend, I need to bulk upsert data. Using this answer I was able to make it work for model with a single primary key. However, I can't make it work with composite keys.
The key part of the code is this one:
# for single pk
primary_key = [key.name for key in inspect(model).primary_key][0]
# get all entries to be updated
for each in DBSession.query(model).filter(getattr(model, primary_key).in_(entries.keys())).all():
entry = entries.pop(str(getattr(each, primary_key)))
I tried to change it to make it work with composite keys:
primary_keys = tuple([key.name for key in inspect(model).primary_key])
# get all entries to be updated
for each in DBSession.query(model).filter(and_(*[getattr(model, col).in_(entries.keys()) for col in primary_keys])).all():
print("This is never printed :(")
I guess this DBSession.query(model).filter(and_(*[getattr(model, col).in_(entries.keys()) for col in primary_keys])).all()
doesn't work as intended.
For reference, here is a fully working snippet:
from sqlalchemy import Column, create_engine, and_, or_
from sqlalchemy.types import String
from sqlalchemy.inspection import inspect
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy import inspect, tuple_
DBSession = scoped_session(sessionmaker())
Base = declarative_base()
class Accounts(Base):
__tablename__ = 'accounts'
account = Column(String(50), primary_key=True)
comment = Column(String(50))
class Users(Base):
__tablename__ = 'users'
user = Column(String(50), primary_key=True)
account = Column(String(50), primary_key=True)
comment = Column(String(50))
accounts_data = {"account1": {"account": "account1", "comment": "test"}, "account2": {"account": "account2", "comment": None}}
users_data = {("user1", "account1"): {"user": "user1", "account": "account1", "comment": ""}, ("user1", "account2"): {"user": "user1", "account": "account2", "comment": ""}}
def upsert_data_single_pk(entries, model):
primary_key = [key.name for key in inspect(model).primary_key][0]
entries_to_update = []
entries_to_insert = []
# get all entries to be updated
for each in DBSession.query(model).filter(getattr(model, primary_key).in_(entries.keys())).all():
entry = entries.pop(str(getattr(each, primary_key)))
entries_to_update.append(entry)
# get all entries to be inserted
for entry in entries.values():
entries_to_insert.append(entry)
DBSession.bulk_insert_mappings(model, entries_to_insert)
DBSession.bulk_update_mappings(model, entries_to_update)
DBSession.commit()
def upsert_data_multiple_pk(entries, model):
primary_keys = tuple([key.name for key in inspect(model).primary_key])
entries_to_update = []
entries_to_insert = []
# get all entries to be updated
for each in DBSession.query(model).filter(and_(*[getattr(model, col).in_(entries.keys()) for col in primary_keys])).all():
# Print the composite primary key value by concatenating the values of the individual columns
print('-'.join([str(getattr(each, col)) for col in primary_keys]))
# get all entries to be inserted
for entry in entries.values():
entries_to_insert.append(entry)
DBSession.bulk_insert_mappings(model, entries_to_insert)
DBSession.bulk_update_mappings(model, entries_to_update)
DBSession.commit()
db_connection_uri = "mysql+pymysql://XXXX:XXXX@XXXX:XXXX/XXXX?charset=utf8mb4"
engine = create_engine(db_connection_uri, echo=False)
DBSession.remove()
DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)
#Base.metadata.drop_all(engine, checkfirst=True)
Base.metadata.create_all(bind=engine)
#upsert_data_single_pk(accounts_data, Accounts)
upsert_data_multiple_pk(users_data, Users)