How can one apply some function in parallel on chunks of a sparse CSR array saved on disk using Python? Sequentially this could be done e.g. by saving the CSR array with joblib.dump
opening it with joblib.load(.., mmap_mode="r")
and processing the chunks of rows one by one. Could this be done more efficiently with dask?
In particular, assuming one doesn't need all the possible out of core operations on sparse arrays, but just the ability to load row chunks in parallel (each chunk is a CSR array) and apply some function to them (in my case it would be e.g. estimator.predict(X)
from scikit-learn).
Besides, is there a file format on disk that would be suitable for this task? Joblib works but I'm not sure about the (parallel) performance of CSR arrays loaded as memory maps; spark.mllib
appears to use either some custom sparse storage format (that doesn't seem to have a pure Python parser) or LIBSVM format (the parser in scikit-learn is, in my experience, much slower than joblib.dump
)...
Note: I have read documentation, various issues about it on https://github.com/dask/dask/ but I'm still not sure how to best approach this problem.
Edit: to give a more practical example, below is the code that works in dask for dense arrays but fails when using sparse arrays with this error,
import numpy as np
import scipy.sparse
import joblib
import dask.array as da
from sklearn.utils import gen_batches
np.random.seed(42)
joblib.dump(np.random.rand(100000, 1000), 'X_dense.pkl')
joblib.dump(scipy.sparse.random(10000, 1000000, format='csr'), 'X_csr.pkl')
fh = joblib.load('X_dense.pkl', mmap_mode='r')
# computing the results without dask
results = np.vstack((fh[sl, :].sum(axis=1)) for sl in gen_batches(fh.shape[0], batch_size))
# computing the results with dask
x = da.from_array(fh, chunks=(2000))
results = x.sum(axis=1).compute()
Edit2: following the discussion below, the example below overcomes the previous error but gets ones about IndexError: tuple index out of range
in dask/array/core.py:L3413
,
import dask
# +imports from the example above
dask.set_options(get=dask.get) # disable multiprocessing
fh = joblib.load('X_csr.pkl', mmap_mode='r')
def func(x):
if x.ndim == 0:
# dask does some heuristics with dummy data, if the x is a 0d array
# the sum command would fail
return x
res = np.asarray(x.sum(axis=1, keepdims=True))
return res
Xd = da.from_array(fh, chunks=(2000))
results_new = Xd.map_blocks(func).compute()