I'm using the delayed annotation to create a lazy data cube
@dask.delayed
def _delayed_func(x: int, y: int, z: int) -> np.array:
return np.random.randn(1000,1000,1000)
I use this function to create lazy data cubes
def create_cube(size):
data = dask.array.concatenate(
[
dask.array.concatenate(
[
dask.array.concatenate(
[
dask.array.from_delayed(
_delayed_func(x, y, z),
shape=(1000,1000,1000),
dtype=float
)
for z in range(size)
], axis=2
)
for x in range(size)
], axis=1
)
for y in range(size)
], axis=0
)
return data
I'm using the kuberay operator on top of OpenShift and have autoscaling activated
My cluster has 4 TiB of main memory, but my lazy cube is 440 TiB in size. I'm computing the mean over all axes. One dask task/ray worker combination only needs to compute the mean on a chunk, which is < 8 GiB
The problem is that Ray fires all 233280 tasks at Ray at the same time and Ray at the same time tries to start additional workers but then eventually needs to kill them again because of out of memory (OOM) errors
The only way to make it work is the following annotation
with dask.annotate(ray_remote_args=dict(num_cpus=10)):
This prevents that too many workers are started at the same time as the number of CPUs of the cluster is 10000, therefore I only get 1000 parallel workers but wasting 80% of my CPUs :)
This there a better way to limit the number of parallel Ray workers?