0

I'm using the delayed annotation to create a lazy data cube

@dask.delayed
def _delayed_func(x: int, y: int, z: int) -> np.array:
    return np.random.randn(1000,1000,1000)

I use this function to create lazy data cubes

def create_cube(size):
    data = dask.array.concatenate(
    [
        dask.array.concatenate(
            [
                dask.array.concatenate(
                    [
                        dask.array.from_delayed(
                            _delayed_func(x, y, z),
                            shape=(1000,1000,1000),
                            dtype=float
                        )   
                        for z in range(size)
                    ], axis=2
                )
                for x in range(size)
            ], axis=1
        )
        for y in range(size)
    ], axis=0
    )
    return data

I'm using the kuberay operator on top of OpenShift and have autoscaling activated

My cluster has 4 TiB of main memory, but my lazy cube is 440 TiB in size. I'm computing the mean over all axes. One dask task/ray worker combination only needs to compute the mean on a chunk, which is < 8 GiB

The problem is that Ray fires all 233280 tasks at Ray at the same time and Ray at the same time tries to start additional workers but then eventually needs to kill them again because of out of memory (OOM) errors

The only way to make it work is the following annotation

with dask.annotate(ray_remote_args=dict(num_cpus=10)):

This prevents that too many workers are started at the same time as the number of CPUs of the cluster is 10000, therefore I only get 1000 parallel workers but wasting 80% of my CPUs :)

This there a better way to limit the number of parallel Ray workers?

enter image description here

Romeo Kienzler
  • 3,373
  • 3
  • 36
  • 58

1 Answers1

0

It's possible to control the number of tasks allocated to a worker using resources. There is a related answer explaining an example of resources and another answer that uses annotations, similar to the snippet you provided. The two adjustments you would need to make it in the snippet:

  1. instantiate workers with the desired resource, e.g.:
cluster = LocalCluster(n_workers=2, memory_limit='4GB', resources={'foo': 1})
  1. Add resource annotation to the delayed graph:
with dask.annotate(resources=dict(foo=1)):

PS: if the real workflow is just creating a random cube, then it might be worth generating the desired shape via dask.array.random.

SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46