1

I've run into some memory problems while using dask's LocalCluster. I'm working on a machine with 32 CPUs, but I have only 64GB RAM available. I'm instantiating the cluster like that:

cluster = LocalCluster(
    n_workers=os.cpu_count(),
    threads_per_worker=1
)

Dask, by default, assigns equal amount of memory per worker (total RAM divided by number of workers). I'm using dask to compute research batches. Those batches differ in their need for memory. There aren't any problems when I process 32 smaller batches, as they fit into the memory. My problem comes, when I move into bigger batches, which can't fit into 2GB of assigned available RAM. Then dask is raising memory allocation errors. I've seen that I can increase worker's timeout, but it's not very elegant solution. Is there any way to tell dask to keep the scheduled task in the queue, unless resources are available? What'd be the correct way to handle these tasks in the queue while using LocalCluster?

Piotr Rarus
  • 884
  • 8
  • 16

1 Answers1

0

One option is to explicitly specify resource requirements for tasks (if you know that in advance), there is a related answer here and in the documentation.

The cluster would be initiated with resources={'mem': 2000} option for workers, and then the expected resource use would be stated when executing the tasks with .compute() or .submit(), e.g. small tasks could specify client.submit(my_func, small_task, resources={'mem': 1000}) (this will execute at most 2 tasks on the worker), while large tasks would specify client.submit(my_func, large_task, resources={'mem': 2000}) (this will execute at most 1 task on the worker).

SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46
  • Thank you for your answer, but I don't find this solution much general. I have hundreds, later thousands of batches, so specifying memory requirements for each of them by hand is a bit out of the scope. – Piotr Rarus Mar 02 '21 at 17:15
  • Hmm, perhaps you can specify a rule that determines resource requirements of each task in a batch (e.g. based on the size of the object that is passed to the task)... – SultanOrazbayev Mar 02 '21 at 17:19