I have some code that takes a long time (~15 mins/loop x 30 loops) to run. Additionally, each loop produces a significant amount of data which takes some time to write to disk as well. All told, this can take several hours to run if run in serial. I'm hoping to speed this up using the multiprocessing
module.
So far I've gotten the computationally expensive parts parallelized, but there's still a rather massive chokepoint when writing things to disk since it all has to converge back to the main thread and the final total is upwards of a few million entries. To this end, I'm trying to figure out if I can perform the writes from each multiprocess
task to speed things up.
Each loop creates several outputs output1
,output2
, and so on, and wants to write its data to a specific (and known) set of databases. I.E - output1 -> db1
, output2 -> db2
. These databases are the same for each loop, and the data is just appended to whatever is already there. The data also all has unique indexes for querying, so order is not important.
Since I'm using pandas
for the actual data analysis, I've been using its methods for storing these DataFrame
s. to_hdf
works in the serial case, but does not support parallel writes (And apparently can have some corruption issues), but I believe to_sql
does. Unfortunately, I have not figured out how to create connections to the databases (I'm using sqlalchemy
) in each process such that concurrent writes are safe.
I've tried various things to try passing objects between threads, but I always seem to get errors such as Synchronized Objects should only be shared between processes through inheritance
when passing objects through mp.Queue
(Things like sqlalchemy.scoped_session
), or can't pickle _thread._local objects
when trying to pass them as a function argument.
Am I just barking up the wrong tree here? Or am I missing something obvious?
The 'test case' I've been using to try things out follows:
import multiprocessing as mp
import numpy as np
import pandas as pd
import sqlalchemy as sql
def sqltest(num):
print("Hello from thread {0}".format(mp.current_process().name))
data = np.random.random((1000000,4))*num
asDF = pd.DataFrame(data,index=np.linspace(num,num+0.99,1000000))
# Need to write to disk in here
return None
def func_callback(c):
print("Got completion callback {0}".format(c))
def err_callback(c):
print("Got error callback {0}".format(c))
def main():
nums = range(0,10)
sqldb = sql.create_engine("sqlite:///test.db")
with mp.Pool() as p:
for i in range(0,10):
p.apply_async(sqltest,callback=func_callback,error_callback=err_callback,args=(i,))
p.close()
p.join()
if __name__ == '__main__':
main()