I am extracting a large number of files (~850,000) and loading them into a postgres database with sqlalchemy. I am able to load the files into the database just fine without using multiprocessing but it will take more than a day to load and I would like it to load faster.
I created a process pool with the following code:
def load_submissions(submissions_path:str, db_url:str) -> None:
submissions = {}
for file in os.listdir(submissions_path):
file_path = os.path.abspath(os.path.join(submissions_path, file))
file = os.path.basename(file)
cik = numbers_from_filename(file)
# only take the left ten digits of the cik
cik = cik[:10] if cik else None
if cik:
if cik in submissions:
submissions[cik].append(file_path)
else:
submissions[cik] = [file_path]
else:
log.info(f'Could not find a CIK in the filename: {file_path}')
log.debug(f'Submissions loaded')
tasks = [(files, db_url) for files in submissions.values()]
log.debug(f'Creating pool with {multiprocessing.cpu_count()} processes')
try:
num_processes = min(len(submissions), multiprocessing.cpu_count())
with multiprocessing.get_context("spawn").Pool(processes=num_processes) as pool:
log.debug(f'Loading submissions with multiprocessing.Pool()')
pool.starmap(load_submission, tasks)
except Exception as e:
log.exception(e)
This calls the function load_submission
which also uses the init
function to create the database engine. Both shown here:
def init(db_url: str) -> sqlalchemy.engine.Engine:
engine = create_engine(db_url, poolclass=sqlalchemy.pool.QueuePool)
# check to see if the tables exists, if not create them
if not inspect(engine).has_table(engine, 'Company'):
Base.metadata.tables['Company'].create(engine)
if not inspect(engine).has_table(engine, 'FormerName'):
Base.metadata.tables['FormerName'].create(engine)
if not inspect(engine).has_table(engine, 'Address'):
Base.metadata.tables['Address'].create(engine)
if not inspect(engine).has_table(engine, 'Ticker'):
Base.metadata.tables['Ticker'].create(engine)
if not inspect(engine).has_table(engine, 'ExchangeCompany'):
Base.metadata.tables['ExchangeCompany'].create(engine)
if not inspect(engine).has_table(engine, 'Filing'):
Base.metadata.tables['Filing'].create(engine)
return engine
def load_submission(submission_files: list[str], db_url: str) -> None:
submission = extract_submission(submission_files)
c = submission.get('cik')
if not c:
raise KeyError(f'No CIK found in submission')
c = c.zfill(10)
log.debug(f'Process {os.getpid()} is loading submission for CIK: {c}')
engine = db.init(db_url)
log.debug(f'Engine initialized for CIK: {c}')
with db.session(engine) as session:
log.debug(f'Session opened for CIK: {c}')
company = db.Company(
cik = c,
name = submission.get('name'),
sic = submission.get('sic'),
entityType = submission.get('entityType'),
insiderTransactionForOwnerExists = submission.get('insiderTransactionForOwnerExists'),
insiderTransactionForIssuerExists = submission.get('insiderTransactionForIssuerExists'),
ein = submission.get('ein'),
description = submission.get('description'),
website = submission.get('website'),
investorWebsite = submission.get('investorWebsite'),
category = submission.get('category'),
fiscalYearEnd = submission.get('fiscalYearEnd'),
stateOfIncorporation = submission.get('stateOfIncorporation'),
phone = submission.get('phone'),
flags = submission.get('flags')
)
session.merge(company)
session.commit()
log.debug(f'Company added to database: {c}')
for ticker in submission['tickers']:
t = db.Ticker(
ticker = ticker,
cik = c
)
session.merge(t)
for exchange in submission['exchanges']:
ec = db.ExchangeCompany(
exchange = exchange,
cik = c
)
session.merge(ec)
addresses = submission['addresses']
for description, address in addresses.items():
a = db.Address(
cik = c,
description = description,
street1 = address.get('street1'),
street2 = address.get('street2'),
city = address.get('city'),
stateOrCountry = address.get('stateOrCountry'),
zipCode = address.get('zipCode')
)
session.merge(a)
former_names = submission['formerNames']
for former_name in former_names:
f = db.FormerName(
formerName = former_name.get('name'),
_from = datetime.fromisoformat(former_name.get('from')) if former_name.get('from') else None,
to = datetime.fromisoformat(former_name.get('to')) if former_name.get('to') else None,
cik = c
)
session.merge(f)
session.commit()
log.debug(f'Ticker, ExchangeCompany, Address, and FormerName added to database: {c}')
session.close()
log.debug(f'Session closed for CIK: {c}')
df = pd.DataFrame(submission['filings']['recent'])
# if the dataframe is empty, return
if df.empty:
engine.dispose()
return
# add url column and cik column to dataframe
df['url'] = df.apply(lambda row: f'{ARCHIVES_URL}/{str(int(c))}/{row["accessionNumber"].replace("-", "")}/{row["accessionNumber"]}.txt', axis=1)
df['cik'] = c
# Convert date columns to datetime objects and handle empty strings
date_columns = ['filingDate', 'reportDate', 'acceptanceDateTime']
for col in date_columns:
df[col] = pd.to_datetime(df[col], errors='coerce')
# create data types dict to pass to to_sql
dtypes = {
'cik': sqltypes.VARCHAR(),
'accessionNumber': sqltypes.VARCHAR(),
'filingDate': sqltypes.Date(),
'reportDate': sqltypes.Date(),
'acceptanceDateTime': sqltypes.DateTime(),
'act': sqltypes.VARCHAR(),
'fileNumber': sqltypes.VARCHAR(),
'filmNumber': sqltypes.VARCHAR(),
'items': sqltypes.VARCHAR(),
'size': sqltypes.INTEGER(),
'isXBRL': sqltypes.Boolean,
'isInlineXBRL': sqltypes.Boolean,
'primaryDocument': sqltypes.VARCHAR(),
'primaryDocumentDescription': sqltypes.VARCHAR()
}
# write dataframe to database
df.to_sql('Filing', engine, if_exists='append', index=False, dtype=dtypes)
log.debug(f'Filings added to database: {c}')
log.debug(f'Session closed for CIK: {c}')
engine.dispose()
log.debug(f'Engine disposed for CIK: {c}')
I am getting the following error when calling the load_submissions
function:
ERROR:eminor.eminor:Error sending result: '<multiprocessing.pool.ExceptionWithTraceback object at 0x00000141F8F0FF90>'. Reason: 'AttributeError("Can't pickle local object 'create_engine..connect'")' Traceback (most recent call last): File "C:\Users\lucky\Repos\lazy_prices\src\python\src\eminor\eminor.py", line 275, in load_submissions pool.starmap(load_submission, tasks) File "C:\Program Files\Python311\Lib\multiprocessing\pool.py", line 375, in starmap return self._map_async(func, iterable, starmapstar, chunksize).get() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Program Files\Python311\Lib\multiprocessing\pool.py", line 774, in get raise self._value multiprocessing.pool.MaybeEncodingError: Error sending result: '<multiprocessing.pool.ExceptionWithTraceback object at 0x00000141F8F0FF90>'. Reason: 'AttributeError("Can't pickle local object 'create_engine..connect'")'
Why is it complaining about a pickling error with the create_engine
function from sqlalchemy?
I tried moving the init
function out of the load_submissions
to see if instantiating the engine outside of the process pool and giving every process the same copy of the engine would work but that failed as well.
I am expecting to see the load_submission
function to be created concurrently in multiple processes and concurrently load files into the database.
Thanks in advance for your support!