2

Using SQLAlchemy and a MariaDB backend, I need to bulk upsert data. Using this answer I was able to make it work for model with a single primary key. However, I can't make it work with composite keys.

The key part of the code is this one:

    # for single pk
    primary_key = [key.name for key in inspect(model).primary_key][0]
    # get all entries to be updated
    for each in DBSession.query(model).filter(getattr(model, primary_key).in_(entries.keys())).all():
        entry = entries.pop(str(getattr(each, primary_key)))

I tried to change it to make it work with composite keys:

    primary_keys = tuple([key.name for key in inspect(model).primary_key])
    # get all entries to be updated
    for each in DBSession.query(model).filter(and_(*[getattr(model, col).in_(entries.keys()) for col in primary_keys])).all():
        print("This is never printed :(")

I guess this DBSession.query(model).filter(and_(*[getattr(model, col).in_(entries.keys()) for col in primary_keys])).all() doesn't work as intended.

For reference, here is a fully working snippet:

from sqlalchemy import Column, create_engine, and_, or_
from sqlalchemy.types import String
from sqlalchemy.inspection import inspect
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy import inspect, tuple_

DBSession = scoped_session(sessionmaker())

Base = declarative_base()

class Accounts(Base):
    __tablename__ = 'accounts'
    account  = Column(String(50), primary_key=True)
    comment  = Column(String(50))

class Users(Base):
    __tablename__ = 'users'
    user  = Column(String(50), primary_key=True)
    account  = Column(String(50), primary_key=True)
    comment  = Column(String(50))

accounts_data = {"account1": {"account": "account1", "comment": "test"}, "account2": {"account": "account2", "comment": None}}

users_data = {("user1", "account1"): {"user": "user1", "account": "account1", "comment": ""}, ("user1", "account2"): {"user": "user1", "account": "account2", "comment": ""}}

def upsert_data_single_pk(entries, model):
    primary_key = [key.name for key in inspect(model).primary_key][0]
    entries_to_update = []
    entries_to_insert = []
    
    # get all entries to be updated
    for each in DBSession.query(model).filter(getattr(model, primary_key).in_(entries.keys())).all():
        entry = entries.pop(str(getattr(each, primary_key)))
        entries_to_update.append(entry)
        
    # get all entries to be inserted
    for entry in entries.values():
        entries_to_insert.append(entry)

    DBSession.bulk_insert_mappings(model, entries_to_insert)
    DBSession.bulk_update_mappings(model, entries_to_update)

    DBSession.commit()

def upsert_data_multiple_pk(entries, model):
    primary_keys = tuple([key.name for key in inspect(model).primary_key])
    entries_to_update = []
    entries_to_insert = []
    
    # get all entries to be updated
    for each in DBSession.query(model).filter(and_(*[getattr(model, col).in_(entries.keys()) for col in primary_keys])).all():
    # Print the composite primary key value by concatenating the values of the individual columns
        print('-'.join([str(getattr(each, col)) for col in primary_keys]))
        
    # get all entries to be inserted
    for entry in entries.values():
        entries_to_insert.append(entry)

    DBSession.bulk_insert_mappings(model, entries_to_insert)
    DBSession.bulk_update_mappings(model, entries_to_update)

    DBSession.commit()

db_connection_uri = "mysql+pymysql://XXXX:XXXX@XXXX:XXXX/XXXX?charset=utf8mb4"
engine = create_engine(db_connection_uri, echo=False)
DBSession.remove()
DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)

#Base.metadata.drop_all(engine, checkfirst=True)
Base.metadata.create_all(bind=engine)

#upsert_data_single_pk(accounts_data, Accounts)
upsert_data_multiple_pk(users_data, Users)
Shan-x
  • 1,146
  • 6
  • 19
  • 44
  • Instead of doing all this work on the client side, which will have race conditions, consider just doing a single REPLACE statement or INSERT...ON DUPLICATE KEY UPDATE statement (I don't know sqlalchemy so can't tell you how you do either in that) – ysth Dec 15 '22 at 10:11
  • what about just upserting with pandas, and if need be alter the table to have the correct primary keys? https://stackoverflow.com/a/40770849/9490769 – oskros Dec 15 '22 at 10:13
  • @ysth sadly sqlalchemy proposes replace only with the ORM (not the core). From what I know this ORM is really inefficient with large data and is not good for bulk insert. – Shan-x Dec 15 '22 at 10:18
  • If you do want to do a query with multiple values for a composite key, you do `WHERE (user, account) IN (("user1", "account1"),("user1", "account2"),...)`. Don't know what that looks like in sqlalchemy – ysth Dec 15 '22 at 10:19
  • Seems like there should be some way to do bulk_insert_mappings but with REPLACE instead of INSERT. – ysth Dec 15 '22 at 10:22
  • Does this help? https://gist.github.com/timtadh/7811458 – ysth Dec 15 '22 at 10:30

1 Answers1

0

I wrote a different function to do what I needed:

    def upsert(self, model: Type[Base], data: List[Dict[str, Any]]) -> None:
        """Upsert a record into the database.

        If the record already exists, it will be updated. If it does not exist, it will be inserted.

        Parameters:
            model: The SQLAlchemy model representing the table.
            data: The data to be inserted or updated, as a list of dictionaries.
        """
        if not data:
            logger.info("No data to insert")
            return None
        logger.info(f"{len(data)} rows to insert/update to {model.__table__}")
        insert_stmt = insert(model.__table__).values(data)
        primary_keys = ModelTools.get_primary_keys(model)
        to_update = {
            k: getattr(insert_stmt.inserted, k)
            for k in data[0].keys()
            if k not in primary_keys
        }

        on_conflict_stmt = insert_stmt.on_duplicate_key_update(**to_update)
        self.engine.execute(on_conflict_stmt)

It is probably not the best time efficient, but it works as intended so for now I'm keeping it.

Shan-x
  • 1,146
  • 6
  • 19
  • 44