Overview:
I am trying to create a large number of variables (wide dataset) for a financial data analysis project. I have a pandas dataframe "position_history" with 140,000 rows, each containing a stock and buy and sell dates/prices.
I have a function create_domain that takes inputs (stock, buy_date, sell_date) and:
- makes a query to my SQLite3 database to extract the time-series for that stock given those dates.
- constructs my variables using the time-series
I apply the function create_domain to position_history using df.apply
When I run my code sequentially it takes about 4 hours to construct my variables, and I would like to speed this up using multiple processes, since I will have to do this many times and may need a much wider dataset.
For multiple processes, I split position_history into chunks vertically, creating a list of dataframes. I pass this list to joblib (multiprocessing). My code almost always hangs indefinitely without throwing any errors (but sometimes runs on small samples).
My suspicion has been that there is a problem arising from my worker processes trying to read the same SQL table concurrently.
I have tried the following remedies:
opened a new connection (sqlalchemy.create_engine) inside the function create_domain so each worker process gets it's own engine/connection
following the sqlalchemy documentation (http://docs.sqlalchemy.org/en/latest/faq/connections.html#how-do-i-use-engines-connections-sessions-with-python-multiprocessing-or-os-fork)
- changed poolclass to NullPool, which disables multiple connections within one engine, forcing the engine to open and close a new connection for each transaction
- just in case, used engine.dispose() for each child to make sure to flush all engine connections and get new ones
- Note -- These solutions were meant to force sqlite3 to not share any connections across threads, which wouldn't work (Is it possible with SQLAlchemy to share SQLite database among multiple Python processes?)
under joblib (https://pythonhosted.org/joblib/parallel.html)
- tried using the 'threading' backend instead of 'multiprocessing'. This worked but didn't speed up the code by much, which from reading about threading here (Multiprocessing vs Threading Python) makes sense because threading does not actually allow you to use multiple cpus.
- tried using the memmaping technique since pandas has some numpy in it; I don't think this was a relevant fix for me
Related stackoverflow entries:
- I would like to use multiple processes, not multithreading as
discussed:
python multiple threaded processes for running executables;
Python sqlite3 and concurrency;
SQLite3 and Multiprocessing;
*I don't understand the last answer on this one
- I am only trying to read, and not write, so I'm not sure if the management of the SQLite locking mechanisms is necessary, as discussed: SQLite suitable for concurrent reading?; Concurrent writing with sqlite3
- This post says it should work in theory: sqlite3 concurrent access; Note -- I don't think WAL mode would help me because I am only reading.
(Psuedo) Code snippets
My call to joblib:
x = Parallel(n_jobs =4)(delayed(create_domain)(chunk, other inputs) for chunk in chunks)
# where each chunk is a portion of the position_history df
My create_domain function:
def create_domain (df=position_history, inputs):
# create vars using row x of position_history
f = lambda x: sql_query_and_create_vars(inputs, x['column'])
result = df.apply(f, axis=1)
return result
Summary: My code hangs forever, crashes the kernel, and doesn't raise any errors. I would appreciate any insight as to:
- Why this is happening?
- How can I fix it?
- Is there simply a better way to do what I'm trying to do? I did my best to vectorize, and optimize my SQL queries.
- Is this a problem with SQLite3? Would something like MySQL work better?
- Any tips; I'm new to coding/python/data science.
Details: I am on a super computer cluster, running Linux, and using iPython on Python 3.4.3.
This is my first stackoverflow question -- sorry in advance for faux pas and thanks for your help!