0

I've been trying various methods to bulk upSert an Azure SQL (MSSQL) database using SQLAlchemy 2.0, the source table is fairly large 2M records and I need to bulk upSert 100,000 records (most of which won't be there).

NOTE This will run as an Azure function so if there is a better way I'm open to this

class issues(Base):
__tablename__ = "issues"
id = mapped_column('id', String(36), primary_key=True)
created = mapped_column ('created', DateTime())
updated = mapped_column ('updated', DateTime())
status = mapped_column('status', String(50))
severity = mapped_column('severity', String(10))
control_id = mapped_column('control_id', String(36))
entity_id = mapped_column('entity_id', String(36))

Example data

issueList = {
        issues( "1234", datetime.now(), datetime.now() , "Test", "Low8", "con123", "ent123"),
        issues( "5678", datetime.now(), datetime.now() , "Test", "Low9", "con123", "ent123"),
}

Currently I'm doing session.merge(issue) but it's slow and doesn't support bulk inserts, I've looked at https://stackoverflow.com/a/69968892/1697288 but have been getting errors as I was passing:

issueList = {
    "1234": { id: "1234", "created": datetime.now(), "updated": datetime.now, "status": "Test", "severity": "Low16", "control_id": "con123", "entity_id": "ent123" },
    "5678": { id: "5678", "created": datetime.now(), "updated": datetime.now, "status": "Test", "severity": "Low9", "control_id": "con123", "entity_id": "ent123" },
}
upsert_data (session, issueList, "issues", "id")

It seems to be expecting a model not text for the 3rd params, so I wasn't sure what to send.

Any suggestions of a fast model would be great. Only this application will be inserting data so locking the db isn't an issue as long as the lock is cleared on error.

Thanks.

trevrobwhite
  • 443
  • 1
  • 7
  • 22

1 Answers1

0

I ended up having writing my own function in the end:

import json
import logging
log = logging.getLogger(__name__)

Make sure the model is defined as this will need to be passed, entities are a list of dictionaries (make sure the dictionary keys match your database field names).

Function, with logging an optional json dump (remove as needed)

def upsert_data(session, entries, model, key):
batch_size = 1000
if batch_size > len(entries):
    batch_size = len(entries)

if jsonDump:
  with open("json/" + model.__tablename__ + "_entries_preprocess.json", "w") as f:
      json.dump(entries, default=str, fp=f)

modelKey = getattr(model, key)

for i in range(0, len(entries), batch_size):

    log.info("Working Batch " + str(i) + "-" + str(i + batch_size))
    # Get the next batch
    batch = entries[i:i + batch_size]

    entries_to_update = []
    entries_to_insert = batch

    # Extract keys from batch
    keysinbatch = [entry.get(key) for entry in batch]

    existing_records = session.query(modelKey).filter(modelKey.in_(keysinbatch)).all()

    # Iterate results
    for entry in existing_records:
        # Process this batch
        dbIndex = getattr(entry, key)
        index = 0
        for x in entries_to_insert:
            if dbIndex == x[key]:
                # Matches item in DB, move this item to the update list
                # Remove from insert list
                entries_to_update.append(entries_to_insert.pop(index))
                break;
            index = index + 1

    # Completed lists get sqlalchemy to handle the operations
    # If any items left in entries insert them

    if jsonDump:
      with open("json/" + model.__tablename__ + "_entries_insert.json", "w") as f:
          json.dump(entries_to_insert, default=str, fp=f)
      
      with open("json/" + model.__tablename__ + "_entries_update.json", "w") as f:
          json.dump(entries_to_update, default=str, fp=f)

    if len(entries_to_insert) > 0:
        log.info("Handing over to sqlalchemy to INSERT " + str(len(entries_to_insert)) + " records")
        session.execute(insert(model), entries_to_insert)

    # Update items if exist
    if len(entries_to_update) > 0:
        log.info("Handing over to sqlalchemy to UPDATE " + str(len(entries_to_update)) + " records")
        session.execute(update(model), entries_to_update)

    # Commit DB
    log.info("Issuing Database Commit")
    session.commit()

log.info("UpSert Complete")
trevrobwhite
  • 443
  • 1
  • 7
  • 22