1

I have a rather complex python algorithm I need to distribute across a HPC cluster.

The code is run from a Jupyterhub instance with 60 gb memory. The configuration of the PBS cluster is 1 process, 1 core, 30Gb per worker, nanny=False (the computations won't run otherwise) for a total of 26 workers (the total memory is about 726GB)

I do not need to fetch back any data, since what is needed is written to disk right at the end of the computations. Note that each computations takes about 7 minutes when run independantly.

The problem I run into is the following : each independant worker (Jobname : dask-worker) seems to run fine, it has about 20Gb available of which max 5Gb is used. But whenever I try to launch more than about 50 jobs, then the central worker (Jobname : jupyterhub) runs out of memory after about 20 minutes.

Here is how I distribute the computations :

def complex_python_func(params):
    return compute(params=params).run()

Then I have tried to use client.map or delayed as such :

list_of_params = [1, 2, 3, 4, 5, ... n] # with n > 256

# With delayed
lazy = [dask.delayed(complex_python_func)(l) for l in list_of_params]
futures = client.compute(lazy)
# Or with map
chain = client.map(complex_python_func, list_of_params)

Here is the configuration of the cluster :

cluster = PBSCluster(
    cores=1,
    memory="30GB",
    interface="ib0",
    queue=queue,
    processes=1,
    nanny=False,
    walltime="12:00:00",
    shebang="#!/bin/bash",
    env_extra=env_extra,
    python=python_bin,
)
cluster.scale(32)

I can't understand why it does not work. I would expect Dask to run each computation then release memory (every about 6/7 minutes for each individual task). I check the memory usage of the worker with qstat -f jobId and it keeps increasing until the worker is killed.

What is causing the jupyterhub worker to fail and what would be the good (or at least a better) way of achieving this ?

Mike
  • 893
  • 7
  • 22

1 Answers1

1

Two potential leads are:

  1. If the workers are not expected to return anything, then it might be worth changing the return statement to return None (it's not clear what compute() does in your script):
 def complex_python_func(params):
    return compute(params=params).run()
  1. It's possible that dask allocates more than one job per worker and at some point the workers has more tasks than it can handle. One way out of this is to reduce the number of tasks that a worker can take at any given time with resources, e.g. using:
# add resources when creating the cluster
cluster = PBSCluster(
    # all other settings are unchanged, but add this line to give each worker
    extra=['--resources foo=1'],
)

# rest of code skipped, but make sure to specify resources needed by task
# when submitting it for computation
lazy = [dask.delayed(complex_python_func)(l) for l in list_of_params]
futures = client.compute(lazy, resources={'foo': 1})
# Or with map
chain = client.map(complex_python_func, list_of_params, resources={'foo': 1})

For more information on resources, see documentation or this related question Specifying Task Resources: Fractional gpu

SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46
  • Thanks, your suggestion did help a little. However, the scheduler runs out of memory at some point. When checking the complex_python_func I found out it contains some dask.delayed and one dask.compute call.... After checking memory usage it seems that this dask.compute is the reason why the scheduler fails... Maybe I should try to work on this (but this code is not from me so I might not be able to change anything). – Mike Feb 16 '21 at 12:11
  • I have tried your suggestion to constrain resources but writing chain = client.map(complex_python_func, list_of_params, resources={'CPU': 1}) stalls the execution.. – Mike Feb 16 '21 at 12:12
  • 1
    Did you provide resources when starting the cluster? – SultanOrazbayev Feb 16 '21 at 12:13
  • The nested `delayed` inside `complex_python_func` is problematic: https://docs.dask.org/en/latest/delayed-best-practices.html#avoid-calling-delayed-within-delayed-functions – SultanOrazbayev Feb 16 '21 at 12:13
  • Oh you're right I mistyped the resources when starting the cluster. This works now with CPU=1.... Concerning the nested delayed ... I wonder what I could do. The complex code requires the delayed to optimise its computations. But I really need to be able to distribute on the cluster. – Mike Feb 16 '21 at 12:38
  • 1
    Option 1 (best): refactor to make optimal user of compute power; option 2 (second best): launch it serially from python (so no nested calls), this will lead to some resource underutilisation, but takes little effort/low risk of new bugs; option 3: use an external workflow manager to launch separate cluster instances for each parameter (probably using a smaller-sized cluster, but it depends on the `complex_python_func`), such as snakemake, sbatch or just a simple bash script that loops over specific files... – SultanOrazbayev Feb 16 '21 at 12:48
  • Returning None and adapting the cluster resources helps to run the method safely – Mike Feb 16 '21 at 13:30