I'm trying to convert my monte carlo simulation from numpy
into dask
, because sometimes the arrays are too large, and can't fit into the memory. Therefore I set up a cluster of computers in the cloud: My dask cluster consists of 24 cores and 94 GB of memory. I prepared a simplified version of my code for this question.
My original numpy
code looks like this:
def numpy_way(sim_count, sim_days, hist_days):
historical_data = np.random.normal(111.51, 10, hist_days)
historical_multidim = np.empty(shape=(1, 1, sim_count, hist_days))
historical_multidim[:, :, :, :] = historical_data
random_days_panel = np.random.randint(low=1,
high=hist_days,
size=(1, 1, sim_count, sim_days))
future_panel = historical_multidim[np.arange(1)[:, np.newaxis, np.newaxis, np.newaxis],
np.arange(1)[:, np.newaxis, np.newaxis],
np.arange(sim_count)[:, np.newaxis],
random_days_panel]
return future_panel.shape
Note: I'm just returning here the shape of the numpy array (but as it is numpy the elements of future_panel are cumputed in memory.
Some words about the function:
- I'm creating a random array
historical_data
- this is only 1D - Then this array is 'broadcasted' to a 4D array (
historical_multidim
). First two dimensions are not used here (but they are in my final application)- 3rd dimension represents how many simulations are done
- 4th dimension is the number of days
forecasted
in the future
random_days_panel
- is just anndarray
of randomly chosen days. So the finalshape
of this array is: 1, 1, sim_count, sim_days (explained in the previous point)future_panel
is anndarray
with the randomly picked values fromhistorical_multidim
. I.e. an array generated from historical data having the expected shape (1, 1, sim_count, sim_days)
Now, the problem is, that some of these steps are not implemented in dask:
historical_multidim[:, :, :, :] = historical_data
-stack
orbroadcast_to
is recommended to use- the slicing used in
future_panel
is not possible in dask
So I came out with this solution:
def dask_way_1d(sim_count, sim_days, hist_days):
historical_data = da.random.normal(111.51, 10, size=hist_days, chunks='auto')
def get_random_days_1d():
return np.random.randint(low=1, high=HIST_DAYS, size=sim_days)
future_simulations = [historical_data[get_random_days_1d()] for _ in range(sim_count)]
future_panel = da.stack(future_simulations)
future_panel = da.broadcast_to(future_panel, shape=(1, 1, sim_count, sim_days))
future_panel.compute()
return future_panel.shape
This solution works, however it is much much slower than the numpy solution. The problem is, that get_random_days_1d()
returns a numpy
array. I tried to use dask
array, but get into an error when computing historical_data[get_random_days_1d()]
-> KilledWorker: ("('normal-932553ab53ba4c7e908d61724430bbb2', 0)", ...
Another solution looks like this:
def dask_way_nd(sim_count, sim_days, hist_days):
historical_data_1d = da.random.normal(111.51, 10, size=hist_days, chunks='auto')
historical_data_2d = da.broadcast_to(historical_data_1d, shape=(sim_count, hist_days))
random_days_panel = np.random.randint(low=1,
high=hist_days,
size=(sim_count, sim_days))
future_panel = historical_data_2d[np.arange(sim_count)[:, np.newaxis], random_days_panel]
future_panel = da.broadcast_to(future_panel, shape=(1, 1, sim_count, sim_days))
future_panel.compute()
return future_panel.shape
This solution stops on future_panel = historical_data_2d[np.arange(sim_count)[:, np.newaxis], random_days_panel]
-> The error is: NotImplementedError: Don't yet support nd fancy indexing
So my question is, Is there any way to implement the same behavior as in my numpy code? But of course I want achhieve better performance (i.e. faster execution time)