0

I implemented some logic (on Windows 10) creating a vector representing simulated spot prices based on numpy random variables. At the end of the day I need 10, 100 or 1000 of this vector. Using a simple for-loop generating 1000 of these, calculation needs roughly 160 seconds. I tried all possible ways of parallelization, e.g. found here (stackoverflow) and beyond. Some methods did not even work, others had no effect at all. So either:

  1. the implementation of these parallelization tools were wrong ...
  2. ... or my function can't be parallelized (because it already uses all threads the the cpu???)

Here is my function (mrm, mp and spm are my custom modules):

def tt(i):
    random_variables = np.random.standard_normal((3, I))
    mean_reversion_model = mrm.Model(t, m, random_variables, vola, df_regression_param, 
        df_mean_reversion_param)
    year_price_simulation = mp.Simulation(mean_reversion_model, df_initial_price)

    year_prices = year_price_simulation.prices()
    monthly_prices = mp.MonthlyPrices(year_prices, monthly_factors_file="month_factors_mr.csv", 
        date_today=date_today, years_to_future=years_to_future, debug=False)

    df_S_monthly = monthly_prices.get_monthly_prices()

    spot_price_simulation = spm.SpotPrice(jumps, mr_regression, 1., 365, 0.0, df_S_monthly, 
        verbose=False)
    res = spot_price_simulation.get_simulated_spot_prices()
    # result_dict[i] = res  # only needed using the multiprocessing examples
    return res

And these were my attempts (all come after)

if __name__ == '__main__':

multiprocessing

import multiprocessing

N = 10
MAX_WORKERS = 4
t0 = time.time()
pool = multiprocessing.Pool(processes=MAX_WORKERS)
t = pool.map(tt, range(N))  # dictionary in function is used to collect results
pool.close()
pool.join()
t1 = time.time() - t0
print("Elapsed time: {}s".format(t1))

-> won't come back...

multiprocessing.pool

import multiprocessing.pool
N = 100
MAX_WORKERS = 4
t0 = time.time()
with multiprocessing.pool.ThreadPool(processes=MAX_WORKERS) as pool:
    t = pool.map(tt, range(N))  # dictionary in function is used to collect results
t1 = time.time() - t0
print("Elapsed time: {}s".format(t1))

-> no improvement, same calculation time as for-loop

concurrent.futures

import concurrent.futures

N = 100
result_dict = dict().fromkeys(range(N))
MAX_WORKERS = 4
t0 = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
    for idx, out in enumerate(executor.map(tt, range(N))):
        result_dict[idx] = out
t1 = time.time() - t0
print("Elapsed time: {}s".format(t1))

-> no improvement, same calculation time as for-loop

asyncio

import asyncio

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

N = 10
result_dict = dict().fromkeys(range(N))
MAX_WORKERS = 4
t0 = time.time()
for i in range(N):
    result_dict[i] = tt()
t1 = time.time() - t0
print("Elapsed time: {}s".format(t1))

-> Error: asyncio await wasn't used with future

numpy apply method

import numpy as np
N = 100
test = np.zeros((N, 1))
t0 = time.time()
res = np.apply_along_axis(tt, 1, test)
t1 = time.time() - t0
print("Elapsed time: {}s".format(t1))

-> no improvement

Community
  • 1
  • 1

1 Answers1

0

Multithreading doesn't really work for Python code because of the global interpreter lock: only one thread at a time can manipulate Python objects. You can use multithreading for calls to non-Python functions.

So you have to use multiprocessing.Pool instead. But in Windows, you must make your main code block conditional, like this:

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=MAX_WORKERS)
    ...etc...

otherwise, each worker will also attempt to startup a Pool and your system will hang. In Linux, this is not necessary, because it handles the creation of workers differently.

Edit: it seems that you did that.

Another thing to be aware of is that numpy in Anaconda uses multi-threaded Intel MKL routines for many numpy and scipy functions - especially the ones operating on large arrays. In that case, attempting to do multithreading or multiprocessing will be counterproductive.

Han-Kwang Nienhuys
  • 3,084
  • 2
  • 12
  • 31
  • Thanks, sorry I forgot to mention that I call all these methods in the main block. Will edit my post. – nomansland008 Jun 07 '20 at 21:31
  • Your edit gives me a new hint. I will research more about numpy and parallelization and give an update. Thank you. – nomansland008 Jun 08 '20 at 11:09
  • After doing some research (google: multiprocessing and numpy) it a solution doesn't seem trivial. As seen here, a solution would be to split a numpy array and use it as parameter for pool.map. https://www.reddit.com/r/learnpython/comments/3ovezc/multiprocessing_with_numpy_arrays/ I will not follow this matter for this special case anymore. Thank you for your advices. – nomansland008 Jun 10 '20 at 09:04