I implemented some logic (on Windows 10) creating a vector representing simulated spot prices based on numpy random variables. At the end of the day I need 10, 100 or 1000 of this vector. Using a simple for-loop generating 1000 of these, calculation needs roughly 160 seconds. I tried all possible ways of parallelization, e.g. found here (stackoverflow) and beyond. Some methods did not even work, others had no effect at all. So either:
- the implementation of these parallelization tools were wrong ...
- ... or my function can't be parallelized (because it already uses all threads the the cpu???)
Here is my function (mrm
, mp
and spm
are my custom modules):
def tt(i):
random_variables = np.random.standard_normal((3, I))
mean_reversion_model = mrm.Model(t, m, random_variables, vola, df_regression_param,
df_mean_reversion_param)
year_price_simulation = mp.Simulation(mean_reversion_model, df_initial_price)
year_prices = year_price_simulation.prices()
monthly_prices = mp.MonthlyPrices(year_prices, monthly_factors_file="month_factors_mr.csv",
date_today=date_today, years_to_future=years_to_future, debug=False)
df_S_monthly = monthly_prices.get_monthly_prices()
spot_price_simulation = spm.SpotPrice(jumps, mr_regression, 1., 365, 0.0, df_S_monthly,
verbose=False)
res = spot_price_simulation.get_simulated_spot_prices()
# result_dict[i] = res # only needed using the multiprocessing examples
return res
And these were my attempts (all come after)
if __name__ == '__main__':
multiprocessing
import multiprocessing
N = 10
MAX_WORKERS = 4
t0 = time.time()
pool = multiprocessing.Pool(processes=MAX_WORKERS)
t = pool.map(tt, range(N)) # dictionary in function is used to collect results
pool.close()
pool.join()
t1 = time.time() - t0
print("Elapsed time: {}s".format(t1))
-> won't come back...
multiprocessing.pool
import multiprocessing.pool
N = 100
MAX_WORKERS = 4
t0 = time.time()
with multiprocessing.pool.ThreadPool(processes=MAX_WORKERS) as pool:
t = pool.map(tt, range(N)) # dictionary in function is used to collect results
t1 = time.time() - t0
print("Elapsed time: {}s".format(t1))
-> no improvement, same calculation time as for-loop
concurrent.futures
import concurrent.futures
N = 100
result_dict = dict().fromkeys(range(N))
MAX_WORKERS = 4
t0 = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
for idx, out in enumerate(executor.map(tt, range(N))):
result_dict[idx] = out
t1 = time.time() - t0
print("Elapsed time: {}s".format(t1))
-> no improvement, same calculation time as for-loop
asyncio
import asyncio
def background(f):
def wrapped(*args, **kwargs):
return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)
return wrapped
N = 10
result_dict = dict().fromkeys(range(N))
MAX_WORKERS = 4
t0 = time.time()
for i in range(N):
result_dict[i] = tt()
t1 = time.time() - t0
print("Elapsed time: {}s".format(t1))
-> Error: asyncio await wasn't used with future
numpy apply method
import numpy as np
N = 100
test = np.zeros((N, 1))
t0 = time.time()
res = np.apply_along_axis(tt, 1, test)
t1 = time.time() - t0
print("Elapsed time: {}s".format(t1))
-> no improvement