0

I have some code that takes a long time (~15 mins/loop x 30 loops) to run. Additionally, each loop produces a significant amount of data which takes some time to write to disk as well. All told, this can take several hours to run if run in serial. I'm hoping to speed this up using the multiprocessing module.

So far I've gotten the computationally expensive parts parallelized, but there's still a rather massive chokepoint when writing things to disk since it all has to converge back to the main thread and the final total is upwards of a few million entries. To this end, I'm trying to figure out if I can perform the writes from each multiprocess task to speed things up.

Each loop creates several outputs output1,output2, and so on, and wants to write its data to a specific (and known) set of databases. I.E - output1 -> db1, output2 -> db2. These databases are the same for each loop, and the data is just appended to whatever is already there. The data also all has unique indexes for querying, so order is not important.

Since I'm using pandas for the actual data analysis, I've been using its methods for storing these DataFrames. to_hdf works in the serial case, but does not support parallel writes (And apparently can have some corruption issues), but I believe to_sql does. Unfortunately, I have not figured out how to create connections to the databases (I'm using sqlalchemy) in each process such that concurrent writes are safe.

I've tried various things to try passing objects between threads, but I always seem to get errors such as Synchronized Objects should only be shared between processes through inheritance when passing objects through mp.Queue (Things like sqlalchemy.scoped_session), or can't pickle _thread._local objects when trying to pass them as a function argument.

Am I just barking up the wrong tree here? Or am I missing something obvious?

The 'test case' I've been using to try things out follows:

import multiprocessing as mp
import numpy as np
import pandas as pd

import sqlalchemy as sql

def sqltest(num):
    print("Hello from thread {0}".format(mp.current_process().name))
    data = np.random.random((1000000,4))*num
    asDF = pd.DataFrame(data,index=np.linspace(num,num+0.99,1000000))
    # Need to write to disk in here
    return None

def func_callback(c):
    print("Got completion callback {0}".format(c))

def err_callback(c):
    print("Got error callback {0}".format(c))

def main():
    nums = range(0,10)
    sqldb = sql.create_engine("sqlite:///test.db")

    with mp.Pool() as p:
        for i in range(0,10):
            p.apply_async(sqltest,callback=func_callback,error_callback=err_callback,args=(i,))
        p.close()
        p.join()

if __name__ == '__main__':
    main()
fergu
  • 329
  • 1
  • 5
  • 12
  • This answer would suggest that concurrent writing to a SQLite database is not enabled by default, you would need to enable WAL https://stackoverflow.com/a/10387821/548562 – Iain Shelvington Nov 03 '19 at 05:01
  • With sqlite, only one connection to the database can hold a write lot at a time, WAL or not. If you want multiple simultaneous writers, you need a different database. – Shawn Nov 03 '19 at 08:21

0 Answers0