0

I am extracting a large number of files (~850,000) and loading them into a postgres database with sqlalchemy. I am able to load the files into the database just fine without using multiprocessing but it will take more than a day to load and I would like it to load faster.

I created a process pool with the following code:

def load_submissions(submissions_path:str, db_url:str) -> None:
    submissions = {}
    for file in os.listdir(submissions_path):

        file_path = os.path.abspath(os.path.join(submissions_path, file))
        file = os.path.basename(file)
        cik = numbers_from_filename(file)
        # only take the left ten digits of the cik
        cik = cik[:10] if cik else None
        
        if cik:
            if cik in submissions:
                submissions[cik].append(file_path)
            else:
                submissions[cik] = [file_path]
        else:
            log.info(f'Could not find a CIK in the filename: {file_path}')
    log.debug(f'Submissions loaded')

    tasks = [(files, db_url) for files in submissions.values()]
    log.debug(f'Creating pool with {multiprocessing.cpu_count()} processes')
    try:
        num_processes = min(len(submissions), multiprocessing.cpu_count())
        with multiprocessing.get_context("spawn").Pool(processes=num_processes) as pool:
            log.debug(f'Loading submissions with multiprocessing.Pool()')
            pool.starmap(load_submission, tasks)

    except Exception as e:
        log.exception(e)

This calls the function load_submission which also uses the init function to create the database engine. Both shown here:

def init(db_url: str) -> sqlalchemy.engine.Engine:
    engine = create_engine(db_url, poolclass=sqlalchemy.pool.QueuePool)
    # check to see if the tables exists, if not create them
    if not inspect(engine).has_table(engine, 'Company'):
        Base.metadata.tables['Company'].create(engine)
    if not inspect(engine).has_table(engine, 'FormerName'):
        Base.metadata.tables['FormerName'].create(engine)
    if not inspect(engine).has_table(engine, 'Address'):
        Base.metadata.tables['Address'].create(engine)
    if not inspect(engine).has_table(engine, 'Ticker'):
        Base.metadata.tables['Ticker'].create(engine)
    if not inspect(engine).has_table(engine, 'ExchangeCompany'):
        Base.metadata.tables['ExchangeCompany'].create(engine)
    if not inspect(engine).has_table(engine, 'Filing'):
        Base.metadata.tables['Filing'].create(engine)
    return engine

def load_submission(submission_files: list[str], db_url: str) -> None:

    submission = extract_submission(submission_files)

    c = submission.get('cik')
    if not c:
        raise KeyError(f'No CIK found in submission')
    c = c.zfill(10)  

    log.debug(f'Process {os.getpid()} is loading submission for CIK: {c}')
    
    engine = db.init(db_url)

    log.debug(f'Engine initialized for CIK: {c}')

    with db.session(engine) as session:        
        log.debug(f'Session opened for CIK: {c}')

        company = db.Company(
            cik = c,
            name = submission.get('name'),
            sic = submission.get('sic'),
            entityType = submission.get('entityType'),
            insiderTransactionForOwnerExists = submission.get('insiderTransactionForOwnerExists'),
            insiderTransactionForIssuerExists = submission.get('insiderTransactionForIssuerExists'),
            ein = submission.get('ein'),
            description = submission.get('description'),
            website = submission.get('website'),
            investorWebsite = submission.get('investorWebsite'),
            category = submission.get('category'),
            fiscalYearEnd = submission.get('fiscalYearEnd'),
            stateOfIncorporation = submission.get('stateOfIncorporation'),
            phone = submission.get('phone'),
            flags = submission.get('flags')
        )
        session.merge(company)
        session.commit()
        log.debug(f'Company added to database: {c}')

        for ticker in submission['tickers']:
            t = db.Ticker(
                ticker = ticker,
                cik = c
            )
            session.merge(t)

        for exchange in submission['exchanges']:

            ec = db.ExchangeCompany(
                exchange = exchange,
                cik = c
            )
            session.merge(ec)

        addresses = submission['addresses']
        for description, address in addresses.items():
            a = db.Address(
                cik = c,
                description = description,
                street1 = address.get('street1'),
                street2 = address.get('street2'),
                city = address.get('city'),
                stateOrCountry = address.get('stateOrCountry'),
                zipCode = address.get('zipCode')
            )
            session.merge(a)

        former_names = submission['formerNames']
        for former_name in former_names:
            f = db.FormerName(
                formerName = former_name.get('name'),
                _from = datetime.fromisoformat(former_name.get('from')) if former_name.get('from') else None,
                to = datetime.fromisoformat(former_name.get('to')) if former_name.get('to') else None,
                cik = c
            )
            session.merge(f)
        session.commit()
        log.debug(f'Ticker, ExchangeCompany, Address, and FormerName added to database: {c}')
        session.close()
        log.debug(f'Session closed for CIK: {c}')

        df = pd.DataFrame(submission['filings']['recent'])

        # if the dataframe is empty, return
        if df.empty:
            engine.dispose()
            return

        # add url column and cik column to dataframe
        df['url'] = df.apply(lambda row: f'{ARCHIVES_URL}/{str(int(c))}/{row["accessionNumber"].replace("-", "")}/{row["accessionNumber"]}.txt', axis=1)
        df['cik'] = c

        # Convert date columns to datetime objects and handle empty strings
        date_columns = ['filingDate', 'reportDate', 'acceptanceDateTime']
        for col in date_columns:
            df[col] = pd.to_datetime(df[col], errors='coerce')

        # create data types dict to pass to to_sql
        dtypes = {
            'cik': sqltypes.VARCHAR(),
            'accessionNumber': sqltypes.VARCHAR(),
            'filingDate': sqltypes.Date(),
            'reportDate': sqltypes.Date(),
            'acceptanceDateTime': sqltypes.DateTime(),
            'act': sqltypes.VARCHAR(),
            'fileNumber': sqltypes.VARCHAR(),
            'filmNumber': sqltypes.VARCHAR(),
            'items': sqltypes.VARCHAR(),
            'size': sqltypes.INTEGER(),
            'isXBRL': sqltypes.Boolean,
            'isInlineXBRL': sqltypes.Boolean,
            'primaryDocument': sqltypes.VARCHAR(),
            'primaryDocumentDescription': sqltypes.VARCHAR()
        }

        # write dataframe to database
        df.to_sql('Filing', engine, if_exists='append', index=False, dtype=dtypes)
        log.debug(f'Filings added to database: {c}')

    log.debug(f'Session closed for CIK: {c}')
    engine.dispose()
    log.debug(f'Engine disposed for CIK: {c}')

I am getting the following error when calling the load_submissions function:

ERROR:eminor.eminor:Error sending result: '<multiprocessing.pool.ExceptionWithTraceback object at 0x00000141F8F0FF90>'. Reason: 'AttributeError("Can't pickle local object 'create_engine..connect'")' Traceback (most recent call last): File "C:\Users\lucky\Repos\lazy_prices\src\python\src\eminor\eminor.py", line 275, in load_submissions pool.starmap(load_submission, tasks) File "C:\Program Files\Python311\Lib\multiprocessing\pool.py", line 375, in starmap return self._map_async(func, iterable, starmapstar, chunksize).get() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Program Files\Python311\Lib\multiprocessing\pool.py", line 774, in get raise self._value multiprocessing.pool.MaybeEncodingError: Error sending result: '<multiprocessing.pool.ExceptionWithTraceback object at 0x00000141F8F0FF90>'. Reason: 'AttributeError("Can't pickle local object 'create_engine..connect'")'

Why is it complaining about a pickling error with the create_engine function from sqlalchemy?

I tried moving the init function out of the load_submissions to see if instantiating the engine outside of the process pool and giving every process the same copy of the engine would work but that failed as well.

I am expecting to see the load_submission function to be created concurrently in multiple processes and concurrently load files into the database.

Thanks in advance for your support!

Parfait
  • 104,375
  • 17
  • 94
  • 125
Michael
  • 1
  • 2
  • Curiously, did you research the error *Can't pickle local object*? Very informative SO post [here](https://stackoverflow.com/q/72766345/1422451). Be sure to [research](https://meta.stackoverflow.com/q/261592/1422451) before asking. – Parfait Aug 20 '23 at 22:25
  • What I think I understand from that post is that I would need to define the `create_engine` library before I call the multiprocesspool? I don't understand how I would do that since it is part of the sqlalchemy package. – Michael Aug 21 '23 at 03:18
  • Yeah, hard to say. Keep on researching. Quick finds: [Connection problems with SQLAlchemy and multiple processes](https://stackoverflow.com/questions/41279157/connection-problems-with-sqlalchemy-and-multiple-processes) and [Using Connection Pools with Multiprocessing or os.fork()](https://docs.sqlalchemy.org/en/20/core/pooling.html#pooling-multiprocessing). – Parfait Aug 21 '23 at 21:02
  • 1
    Awesome, thank you for those. I actually just found that I had an error in the `init` function. I think (but did not truly confirm) that the error was throwing an exception which was being picked up by pickle and that was causing pickle to throw an exception. I fixed the error in the init function and it worked! – Michael Aug 22 '23 at 02:02
  • Interesting! Feel free to answer your own question with specific error you found to help future readers! – Parfait Aug 22 '23 at 14:30

1 Answers1

0

TLDR: If there is an error raised in code called within a multiprocessing pool then pickle will have trouble handling it and give this message.

So this ended up being a simple fix but wasn't readily apparent from the error message.

There was an error in my call to the sqlalchemy function inspect I had too many parameters. This caused the function to raise an error which was raised within the multiprocessing pool.

I am making some assumptions about the reasons the reasons this fix worked and didn't truly verify if it was caused by how pickle and multiprocessing pool work with catching exceptions, but by fixing the error in the code it made everything else run just fine.

Michael
  • 1
  • 2
  • Your answer could be improved with additional supporting information. Please [edit] to add further details, such as citations or documentation, so that others can confirm that your answer is correct. You can find more information on how to write good answers [in the help center](/help/how-to-answer). – Community Aug 27 '23 at 16:22