0

I have a 1,000,000 records that I am trying to enter to the database, some of the records unfortunately are not standing with the db schema. At the moment when a record failed I am doing:

  1. rollback to the database
  2. observer the exception
  3. fix the issue
  4. run again. I wish to build a script which would save a side all "bad" records but would commit all the correct ones.

Of course I can commit one by one and then when the commit fail rollback and commit the next but I would pay a "run time price" as the code would run for a long time.

In the example below i have two models: File and Client.The a relation one client has many files. In the commit.py file i wish to commit 1M File objects at once or at batches (1k). at the moment I only understand when something failed when i commit at the end, is there a way to know which object are "bad" ( Integrity errors with the foreign key as example) before, i.e park a side ( in another list) but committing all the "good"

thx a lot for the help

#model.py

from sqlalchemy import Column, DateTime, String, func, Integer, ForeignKey

from . import base
class Client(base):
    __tablename__ = 'clients'
    id = Column(String, primary_key=True)

class File(base):
    __tablename__ = 'files'
    id = Column(Integer, primary_key=True, autoincrement=True)
    client_id = Column(String, ForeignKey('clients.id'))
    path = Column(String)


#init.py
import os
from dotenv import load_dotenv
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base

load_dotenv()
db_name = os.getenv("DB_NAME")
db_host = os.getenv("DB_HOST")
db_port = os.getenv("DB_PORT")
db_user = os.getenv("DB_USER")
db_password = os.getenv("DB_PASSWORD")

db_uri = 'postgresql://' + db_user + ':' + db_password + '@' + db_host + ':' + db_port + '/' + db_name
print(f"product_domain: {db_uri}")


base = declarative_base()
engine = create_engine(db_uri)
base.metadata.bind = engine
Session = sessionmaker(bind=engine)
session = Session()
conn = engine.connect()

#commit.py
from . import session

def commit(list_of_1m_File_objects_records):
    #I wish to for loop over the rows and if a specific row rasie excaption to insert it to a list and handle after wards
    for file in list_of_1m_File_objects_records:
        session.add(file)
    session.commit()
    
# client:
# id
# "a"
# "b"
# "c"
# file:
# id|client_id|path
# --|---------|-------------
# 1   "a"      "path1.txt"
# 2   "aa"     "path2.txt"
# 3   "c"      "path143.txt"
# 4   "a"      "pat.txt"
# 5   "b"      "patb.txt"
# wish the file data would enter the database although it has one record "aa" which will raise integrity error


helpper
  • 2,058
  • 4
  • 13
  • 32
  • So the error you have is an integrity error. Can we assume that your source of the 1m rows does not introduce the integrity error? ie, is your data unique on id? I don't use sqlalchemy much but it seems to me that integrity errors can't be discovered by the database server until you attempt to write to the database. Below, someone suggested that you check for existing records first. session.commit at the conclusion of your loop is fast because it defers database writes. Or perhaps session.merge() is interesting. See https://stackoverflow.com/a/26018934/401226 – Tim Richardson Jan 05 '21 at 10:37
  • The opposite we can assume that one of the 1M rows is causing the error. in that case what is the fastest way to commit ? – helpper Jan 05 '21 at 13:03
  • If you want to update the database with the new row in case of integrity error, you can try session.merge – Tim Richardson Jan 06 '21 at 01:37
  • ... or take the suggest below: get all the existing IDs via a query, put them in a python set() and check the ID before you do the session.add(). This is probably the fastest way. – Tim Richardson Jan 06 '21 at 01:56

2 Answers2

0

Since I can't comment, I would suggest to use psycopg2 and sqlAlchemy to generate the connection with the db and then use a query with "On conflict" at the end of the query to add and commit your data

Joaquín
  • 350
  • 2
  • 12
0

Of course I can commit one by one and then when the commit fail rollback and commit the next but I would pay a "run time price" as the code would run for a long time.

What is the source of that price? If it is fsync speed, you can get rid of most of that cost by setting synchronous_commit to off on the local connection. If you have a crash part way through, then you need to figure out which ones had actually been recorded once it comes back up so you know where to start up again, but I wouldn't think that that would be hard to do. This method should get you most benefit for the least work.

at the moment I only understand when something failed when i commit at the end

It sounds like you are using deferred constraints. Is there a reason for that?

is there a way to know which object are "bad" ( Integrity errors with the foreign key as example)

For the case of that example, read all the Client ids into a dictionary before you start (assuming they will fit in RAM) then test Files on the python side so you can reject the orphans before trying to insert them.

jjanes
  • 37,812
  • 5
  • 27
  • 34
  • regarding the "time price" - sorry for the uncertainty but I am a bit confused: session.add, do not insert to the db, it is "pending request" session.commit write to the db therefore the commit statement takes much longer than the add isn't? – helpper Dec 22 '20 at 19:40