0

I am pretty new to Dask and I am having some trouble using function map_blocks. I am trying to execute a function on each element of a 2D array. Instead of creating 2 arrays for indexes i and j, I created 1 of size i * j.

ij = da.arange(n_users*n_ratings)
diff = da.map_blocks(compute_error, ij, dtype=np.float_).compute()

The function compute_error:

def compute_error(ij):
    i = int(ij/n_users)
    j = ij%n_users
    if not np.isnan(x[i,j]):
        return x[i, j] - np.dot(user_mat[j, :], ratings_mat[:, i])
    else:     
        return 0.0

Matrix x looks like:

1    Nan  Nan  Nan  5    2
Nan  Nan  Nan  Nan  4    Nan
Nan  3    Nan  Nan  4    Nan
Nan  3    Nan  Nan  Nan  Nan

Matrix user_mat (n_users X num_latent_features) and ratings_mat(num_latent_features X num_ratings) respectively:

float float        float float float float
float float        float float float float
float float
float float 
float float 
float float 

I've read the documentation and searched stackoverlow but I still fail to fix the following problem:

KilledWorker                              Traceback (most recent call last)

<ipython-input-43-e670a6d660ce> in <module>
     12     # For each user-offer pair
     13     ij = da.arange(n_users*n_offers)
---> 14     diff = da.map_blocks(compute_error, ij, dtype=np.float_).compute()

c:\users\appdata\local\programs\python\python36\lib\site-packages\dask\base.py in compute(self, **kwargs)
    281         dask.base.compute
    282         
--> 283         (result,) = compute(self, traverse=False, **kwargs)
    284         return result
    285 

c:\users\appdata\local\programs\python\python36\lib\site-packages\dask\base.py in compute(*args, **kwargs)
    563         postcomputes.append(x.__dask_postcompute__())
    564 
--> 565     results = schedule(dsk, keys, **kwargs)
    566     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    567 

c:\users\appdata\local\programs\python\python36\lib\site-packages\distributed\client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2652                     should_rejoin = False
   2653             try:
-> 2654                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2655             finally:
   2656                 for f in futures.values():

c:\users\appdata\local\programs\python\python36\lib\site-packages\distributed\client.py in gather(self, futures, errors, direct, asynchronous)
   1967                 direct=direct,
   1968                 local_worker=local_worker,
-> 1969                 asynchronous=asynchronous,
   1970             )
   1971 

c:\users\appdata\local\programs\python\python36\lib\site-packages\distributed\client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    836         else:
    837             return sync(
--> 838                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    839             )
    840 

c:\users\appdata\local\programs\python\python36\lib\site-packages\distributed\utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    349     if error[0]:
    350         typ, exc, tb = error[0]
--> 351         raise exc.with_traceback(tb)
    352     else:
    353         return result[0]

c:\users\appdata\local\programs\python\python36\lib\site-packages\distributed\utils.py in f()
    332             if callback_timeout is not None:
    333                 future = asyncio.wait_for(future, callback_timeout)
--> 334             result[0] = yield future
    335         except Exception as exc:
    336             error[0] = sys.exc_info()

c:\users\appdata\local\programs\python\python36\lib\site-packages\tornado\gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

c:\users\appdata\local\programs\python\python36\lib\site-packages\distributed\client.py in _gather(self, futures, errors, direct, local_worker)
   1826                             exc = CancelledError(key)
   1827                         else:
-> 1828                             raise exception.with_traceback(traceback)
   1829                         raise exc
   1830                     if errors == "skip":

KilledWorker: ("('arange-compute_error-71748aa3c524bc2a5b920efa05deec65', 2)", <Worker 'tcp://127.0.0.1:50070', name: 0, memory: 0, processing: 4>)

I am also opend to suggestions if there is any more efficient way to do this computation.

  • Shape of the input/output is probably fine because it gives different error. Not quite sure what this error even means. –  May 16 '21 at 12:27

1 Answers1

1

Instead of operating on ij array, and convert its values to indices in source arrays, use dask to operate on the actual source arrays. It will be substantially faster.

I created source arrays as:

  1. The source (Numpy) array to create x:

    arr = np.array([
        [1,      np.nan, np.nan, np.nan, 5,      2],
        [np.nan, np.nan, np.nan, np.nan, 4,      np.nan],
        [np.nan, 3,      np.nan, np.nan, 4,      np.nan],
        [np.nan, 3,      np.nan, np.nan, np.nan, np.nan]
    ])
    
  2. x array (from arr):

    x = da.from_array(arr, chunks=(2, 3))
    

    (I passed chunks to avoid creation of x as a single-chunk array).

  3. user_mat and ratings_mat:

    user_mat = np.arange(1, 13, dtype='float').reshape(6, 2)
    ratings_mat = np.arange(2, 10, dtype='float').reshape(2, 4)
    

    I created them as Numpy arrays, but following da operations convert them to da arrays.

The actual operation is:

result = da.where(da.notnull(x), da.subtract(x, da.dot(user_mat, ratings_mat).T), 0).compute()

Steps:

  • da.notnull(x) - the result selection criterion (either subtraction or zero),
  • da.subtract(...) - the subtraction (first result),
  • 0 - second result (for NaN elements in x),
  • da.where(...) - the recipe of what to compute,
  • compute() - actual computation.

The result, for above data, is:

array([[ -13.,    0.,    0.,    0.,  -73.,  -92.],
       [   0.,    0.,    0.,    0.,  -93.,    0.],
       [   0.,  -41.,    0.,    0., -112.,    0.],
       [   0.,  -48.,    0.,    0.,    0.,    0.]])
Valdi_Bo
  • 30,023
  • 4
  • 23
  • 41
  • What does `chunks=(2, 3)` do? I acctually read my `x` matrix from h5 file: `x = da.from_array(hf['/data_1'])`, which creates chuks automatically. Should I set it manually? –  May 16 '21 at 15:17
  • I passed *chunks* only to avoid a single-chunk array, but you can omit this parameter and stay with the default chunking offered by *dask*. – Valdi_Bo May 16 '21 at 15:25
  • The solution works as stated. I will accept the answer. One more question. What would be the most efficient way to execute another function (squares of each element of array) on the result (it has to be seperate function because I need intermediate result for other computation). The result is numpy array so map_blocks does not work on it. –  May 16 '21 at 15:32
  • 1
    See https://stackoverflow.com/questions/35215161/most-efficient-way-to-map-function-over-numpy-array It contains a discussion of various approaches. – Valdi_Bo May 16 '21 at 15:41