31

I'm trying to convert my monte carlo simulation from numpy into dask, because sometimes the arrays are too large, and can't fit into the memory. Therefore I set up a cluster of computers in the cloud: My dask cluster consists of 24 cores and 94 GB of memory. I prepared a simplified version of my code for this question.

My original numpy code looks like this:

def numpy_way(sim_count, sim_days, hist_days):
   historical_data = np.random.normal(111.51, 10, hist_days)
   historical_multidim = np.empty(shape=(1, 1, sim_count, hist_days))
   historical_multidim[:, :, :, :] = historical_data


   random_days_panel = np.random.randint(low=1,
                                      high=hist_days,
                                      size=(1, 1, sim_count, sim_days))
   future_panel = historical_multidim[np.arange(1)[:, np.newaxis, np.newaxis, np.newaxis],
                                      np.arange(1)[:, np.newaxis, np.newaxis],
                                      np.arange(sim_count)[:, np.newaxis],
                                      random_days_panel]
   return future_panel.shape

Note: I'm just returning here the shape of the numpy array (but as it is numpy the elements of future_panel are cumputed in memory.

Some words about the function:

  • I'm creating a random array historical_data - this is only 1D
  • Then this array is 'broadcasted' to a 4D array (historical_multidim). First two dimensions are not used here (but they are in my final application)
    • 3rd dimension represents how many simulations are done
    • 4th dimension is the number of days forecasted in the future
  • random_days_panel - is just an ndarray of randomly chosen days. So the final shape of this array is: 1, 1, sim_count, sim_days (explained in the previous point)
  • future_panel is an ndarray with the randomly picked values from historical_multidim. I.e. an array generated from historical data having the expected shape (1, 1, sim_count, sim_days)

Now, the problem is, that some of these steps are not implemented in dask:

  • historical_multidim[:, :, :, :] = historical_data - stack or broadcast_to is recommended to use
  • the slicing used in future_panel is not possible in dask

So I came out with this solution:

def dask_way_1d(sim_count, sim_days, hist_days):
    historical_data = da.random.normal(111.51, 10, size=hist_days, chunks='auto')
    def get_random_days_1d():
        return np.random.randint(low=1, high=HIST_DAYS, size=sim_days)
    future_simulations = [historical_data[get_random_days_1d()] for _ in range(sim_count)]
    future_panel =  da.stack(future_simulations)
    future_panel = da.broadcast_to(future_panel, shape=(1, 1, sim_count, sim_days))
    future_panel.compute()
    return future_panel.shape

This solution works, however it is much much slower than the numpy solution. The problem is, that get_random_days_1d() returns a numpy array. I tried to use dask array, but get into an error when computing historical_data[get_random_days_1d()] -> KilledWorker: ("('normal-932553ab53ba4c7e908d61724430bbb2', 0)", ...

Another solution looks like this:

def dask_way_nd(sim_count, sim_days, hist_days):
    historical_data_1d = da.random.normal(111.51, 10, size=hist_days, chunks='auto')
    historical_data_2d = da.broadcast_to(historical_data_1d, shape=(sim_count, hist_days))
    random_days_panel = np.random.randint(low=1,
                                          high=hist_days,
                                          size=(sim_count, sim_days))
    future_panel = historical_data_2d[np.arange(sim_count)[:, np.newaxis], random_days_panel]
    future_panel = da.broadcast_to(future_panel, shape=(1, 1, sim_count, sim_days))
    future_panel.compute()
    return future_panel.shape

This solution stops on future_panel = historical_data_2d[np.arange(sim_count)[:, np.newaxis], random_days_panel] -> The error is: NotImplementedError: Don't yet support nd fancy indexing

So my question is, Is there any way to implement the same behavior as in my numpy code? But of course I want achhieve better performance (i.e. faster execution time)

Nima
  • 404
  • 3
  • 14
patex1987
  • 407
  • 1
  • 5
  • 10
  • have you had a look at the [multiprocessing library](https://docs.python.org/2/library/multiprocessing.html)? – DrBwts Aug 23 '18 at 11:57
  • Yes, of course. Actually, I am investigating what are the possible solutions to parallelize my current solution. The problem is that sometimes I have too large datasets which can't fit into memory on a single computer, so I wanted try out some solution with distributed computing – patex1987 Aug 23 '18 at 12:08
  • 4
    I had similar issues with an incomplete `pandas.Dataframe` API. I had to reimplement my problem in a map-reduce way, dask offers such functionality like `scatter`, `map` and `gather`. Could you call your function for every value in `hist_days` or one of the other arguments ? – rocksportrocker Aug 28 '18 at 11:47
  • 1
    FWIW I find that questions that focus on how to use a particular feature are much more commonly answered than questions of the form "here is my full workflow, please help" – MRocklin Sep 02 '18 at 12:54
  • @rocksportrocker Sorry, I need some time to return back to this topic. But basically what you are suggesting is to split the data into smaller chunks - `scatter` it to the distributed memory - do the calculation there, and `gather` the results? I need to dig deeper into dask – patex1987 Sep 04 '18 at 11:56
  • @MRocklin Do you think it would be a good idea to create another question focusing only on the main problem here? – patex1987 Sep 04 '18 at 11:57
  • 1
    @patex1987 yes, this worked for me within a small experiment trying to move something from an existing HPC batch system based analysis to dask. – rocksportrocker Sep 04 '18 at 11:57
  • Now, this might not answer your question, but it looks to me, you can change your code to a more memory efficient solution. def my_numpy_way(sim_count, sim_days, hist_days): historical_data = np.random.normal(111.51, 10, size=hist_days).astype(np.float32) future_panel = historical_data[..., np.random.randint(low=1, high=hist_days, size=(1, 1, sim_count, sim_days)).astype(np.int32)] return future_panel.shape – lmielke Feb 05 '22 at 12:42

1 Answers1

1

Chunk random_days_panel instead of historical_data and use da.map_blocks:

def dask_way(sim_count, sim_days, hist_days):
    # shared historical data
    # on a cluster you'd load this on each worker, e.g. from a NPZ file
    historical_data = np.random.normal(111.51, 10, size=hist_days)

    random_days_panel = da.random.randint(
        1, hist_days, size=(1, 1, sim_count, sim_days)
    )
    future_panel = da.map_blocks(
        lambda chunk: historical_data[chunk], random_days_panel, dtype=float
    )

    future_panel.compute()

    return future_panel

This will delegate all the work to the workers and will become faster than pure numpy once your problem gets large enough to amortize the initial (constant) cost for bringing up the scheduler and distributing the context:

hist_days = int(1e6)
sim_days = int(1e5)
sim_count = int(1000)

# dask_way(sim_count, sim_days, hist_days)
# (code in this answer)
532 ms ± 53.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# numpy_way(sim_count, sim_days, hist_days)
# (code in the question)
4.47 s ± 79.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# dask_way_1d(sim_count, sim_days, hist_days)
# (code in the question)
5.76 s ± 91.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
FirefoxMetzger
  • 2,880
  • 1
  • 18
  • 32