I am pretty new to Dask and I am having some trouble using function map_blocks
. I am trying to execute a function on each element of a 2D array. Instead of creating 2 arrays for indexes i
and j
, I created 1 of size i * j
.
ij = da.arange(n_users*n_ratings)
diff = da.map_blocks(compute_error, ij, dtype=np.float_).compute()
The function compute_error
:
def compute_error(ij):
i = int(ij/n_users)
j = ij%n_users
if not np.isnan(x[i,j]):
return x[i, j] - np.dot(user_mat[j, :], ratings_mat[:, i])
else:
return 0.0
Matrix x
looks like:
1 Nan Nan Nan 5 2
Nan Nan Nan Nan 4 Nan
Nan 3 Nan Nan 4 Nan
Nan 3 Nan Nan Nan Nan
Matrix user_mat
(n_users X num_latent_features) and ratings_mat
(num_latent_features X num_ratings) respectively:
float float float float float float
float float float float float float
float float
float float
float float
float float
I've read the documentation and searched stackoverlow but I still fail to fix the following problem:
KilledWorker Traceback (most recent call last)
<ipython-input-43-e670a6d660ce> in <module>
12 # For each user-offer pair
13 ij = da.arange(n_users*n_offers)
---> 14 diff = da.map_blocks(compute_error, ij, dtype=np.float_).compute()
c:\users\appdata\local\programs\python\python36\lib\site-packages\dask\base.py in compute(self, **kwargs)
281 dask.base.compute
282
--> 283 (result,) = compute(self, traverse=False, **kwargs)
284 return result
285
c:\users\appdata\local\programs\python\python36\lib\site-packages\dask\base.py in compute(*args, **kwargs)
563 postcomputes.append(x.__dask_postcompute__())
564
--> 565 results = schedule(dsk, keys, **kwargs)
566 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
567
c:\users\appdata\local\programs\python\python36\lib\site-packages\distributed\client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2652 should_rejoin = False
2653 try:
-> 2654 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2655 finally:
2656 for f in futures.values():
c:\users\appdata\local\programs\python\python36\lib\site-packages\distributed\client.py in gather(self, futures, errors, direct, asynchronous)
1967 direct=direct,
1968 local_worker=local_worker,
-> 1969 asynchronous=asynchronous,
1970 )
1971
c:\users\appdata\local\programs\python\python36\lib\site-packages\distributed\client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
836 else:
837 return sync(
--> 838 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
839 )
840
c:\users\appdata\local\programs\python\python36\lib\site-packages\distributed\utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
349 if error[0]:
350 typ, exc, tb = error[0]
--> 351 raise exc.with_traceback(tb)
352 else:
353 return result[0]
c:\users\appdata\local\programs\python\python36\lib\site-packages\distributed\utils.py in f()
332 if callback_timeout is not None:
333 future = asyncio.wait_for(future, callback_timeout)
--> 334 result[0] = yield future
335 except Exception as exc:
336 error[0] = sys.exc_info()
c:\users\appdata\local\programs\python\python36\lib\site-packages\tornado\gen.py in run(self)
760
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
c:\users\appdata\local\programs\python\python36\lib\site-packages\distributed\client.py in _gather(self, futures, errors, direct, local_worker)
1826 exc = CancelledError(key)
1827 else:
-> 1828 raise exception.with_traceback(traceback)
1829 raise exc
1830 if errors == "skip":
KilledWorker: ("('arange-compute_error-71748aa3c524bc2a5b920efa05deec65', 2)", <Worker 'tcp://127.0.0.1:50070', name: 0, memory: 0, processing: 4>)
I am also opend to suggestions if there is any more efficient way to do this computation.