0

Currently, I use dask_jobsqueue to parallelize my code, and I have difficulty setting up a cluster quickly when the number of workers is large.

When I scale up the number of workers (say more than 2000), it takes more than 15 mins for the cluster to be ready. Given that I request so many workers here, I understand it takes some time, but I wonder if there is a way to speed it up.

(I am also trying to free up the GIL so that I can use threading, but not successful yet. Here I would like to know if there is a way to speed up the setup without using threading)

My cluster setup looks like


from dask_jobqueue import SLURMCluster


cluster = SLURMCluster(cores = 256,
                       memory = '252 GB',
                       interface='ib0',
                       processes = 128,
                       death_timeout = '2000', #originally 60 sec. this is too short and workers would die.
                       walltime = '01:00:00', 
                       job_extra = ['--partition=<****>'],                       
                       local_directory = '/tmp', #use compute nodes' local
                       log_directory = '<****>',
                       scheduler_options={"dashboard_address": ":8786"},
                       env_extra = ['module purge',
                                    'module load Compilers/GCC/8.2.0 MPI/EPYC/OpenMPI/4.0.2/GCC/8.2.0 Python/3.7.6 Anaconda/3.7',
                                    'export MKL_NUM_THREADS=1',
                                  ]) 

cluster.job_cls.submit_command = 'ssh <login node> sbatch' # I'm not allowed to throw a job from a compute node

, which generates a script like

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -e <local_log>/dask-worker-%J.err
#SBATCH -o <local_log>/dask-worker-%J.out
#SBATCH -n 1
#SBATCH --cpus-per-task=256
#SBATCH --mem=235G
#SBATCH -t 01:00:00
#SBATCH --partition=<***>
module purge
module load Compilers/GCC/8.2.0 MPI/EPYC/OpenMPI/4.0.2/GCC/8.2.0 Python/3.7.6 Anaconda/3.7
export MKL_NUM_THREADS=1
echo $PATH

<python> -m distributed.cli.dask_worker tcp://***.**.***.**:<port> --nthreads 2 --nprocs 128 --memory-limit 1.83GiB --name dummy-name --nanny --death-timeout 2000 --local-directory /tmp --interface ib0 --protocol tcp://

Then I scale up the cluster with 20 nodes,

cluster.scale(128*20)

This takes more than 15 mins for the cluster to be ready.

With the number of workers 128 (1 node), it takes one minute or so.

distributed.yaml has the following setup.

distributed:

  worker:
    multiprocessing-method: fork #speed up a bit
    use-file-locking: True #did not change the time

  admin:
    tick:
      limit: 30s       # otherwise the I get many warnings in the log files
Yuki
  • 1

1 Answers1

0

It's not ideal, but when working with such large numbers of workers, it might be easier to request the workers externally (not using dask_jobqueue). The workflow here would be a bash script that submits the jobs starting individual workers.

The workers will need to know where to connect and one way of letting them know is to write the contents of client.scheduler_info() in a json file and tell workers to use that file for info on scheduler connection (--scheduler-file my_file.json).

SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46