What is the most efficient way to apply a function to each column of a Dask array? As documented below, I've tried a number of things but I still suspect that my use of Dask is rather amateurish.
I have a quite wide and quite long array, in the region of 3,000,000 x 10,000. I want to apply the ecdf function to each column of this array. The individual column results stacked together should result in an array with the same dimension as the input array.
Consider the following tests and let me know which approach is the ideal one or how I can improve. I know, I could just use the fastest one, but I really want to exploit the possibilities of Dask to the maximum. The arrays could also be multiple times bigger. At the same time, the results of my benchmarks are surprising for me. Maybe I don't understand the logic behind Dask correctly.
import numpy as np
import dask
import dask.array as da
from dask.distributed import Client, LocalCluster
from statsmodels.distributions.empirical_distribution import ECDF
### functions
def ecdf(x):
fn = ECDF(x)
return fn(x)
def ecdf_array(X):
res = np.zeros_like(X)
for i in range(X.shape[1]):
res[:,i] = ecdf(X[:,i])
return res
### set up scheduler / workers
n_workers = 10
cluster = LocalCluster(n_workers=n_workers, threads_per_worker=4)
client = Client(cluster)
### create data set
X = da.random.random((100000,100)) #dask
Xarr = X.compute() #numpy
### traditional for loop
%timeit -r 10 foo1 = ecdf_array(Xarr)
### adjusting chunk size to 2d-array and map_blocks
X = X.rechunk(chunks=(X.shape[0],np.ceil(X.shape[1]/n_workers)))
Xm = X.map_blocks(lambda x: ecdf_array(x),meta = np.array((), dtype='float'))
%timeit -r 10 foo2 = Xm.compute()
### adjusting chunk size to column size and map_blocks
X = X.rechunk(chunks=(X.shape[0],1))
Xm = X.map_blocks(lambda x: np.expand_dims(ecdf(np.squeeze(x)),1),meta = np.array((), dtype='float'))
%timeit -r 10 foo3 = Xm.compute()
### map over columns by slicing
Xm = client.map(lambda i: ecdf(np.asarray(X[:,i])),range(X.shape[1]))
Xm = client.submit(lambda x: da.transpose(da.vstack(x)),Xm)
%timeit -r 10 foo4 = Xm.result()
### apply_along_axis
Xaa = da.apply_along_axis(lambda x: np.expand_dims(ecdf(x),1), 0, X, dtype=X.dtype, shape=X.shape)
%timeit -r 10 foo5 = Xaa.compute()
### lazy loop
Xl = []
for i in range(X.shape[1]):
Xl.append(dask.delayed(ecdf)(X[:,i]))
Xl = dask.delayed(da.vstack)(Xl)
%timeit -r 10 foo6 = Xl.compute()
Along my benchmarks "map over columns by slicing" is the fastest approach followed by "adjusting chunk size to column size & map_blocks" and the non-parallel "apply_along_axis".
Method | Results (10 loops) |
---|---|
traditional for loop | 2.16 s ± 82.3 ms |
adjusting chunk size to 2d-array & map_blocks | 1.26 s ± 301 ms |
adjusting chunk size to column size & map_blocks | 926 ms ± 31.9 |
map over columns by slicing | 316 ms ± 11.5 ms |
apply_along_axis | 1.01 s ± 18.7 ms |
lazy loop | 1.4 s ± 352 ms |
Along my understanding of the idea behind Dask, I would have expected the "adjusting chunk size to 2d-array & map_blocks" method to be the fastest. The two approaches which performed best don't seem to be very "Dasky" at the same time the non-parallel apply_along_axis is ranked third. All that gives my the suspicion that I am doing something wrong.