I am running multiple parallel tasks on a multi-node distributed Dask cluster. However, once the tasks are finished, workers still hold large memory and cluster gets filled up soon.
I have tried client.restart()
after every task and client.cancel(df)
, the first one kills workers and sends CancelledError
to other running tasks which is troublesome and second one did not help much because we use a lot of custom objects and functions inside Dask's map
functions. Adding del
for known variables and gc.collect()
also doesn't help much.
I am sure most of the memory held up is because of custom python functions and objects called with client.map(..)
.
My questions are:
- Is there a way from command-line or other wise which is like
trigger worker restart if no tasks are running right now
? - If not, what are the possible solutions to this problem? It will be impossible for me to avoid custom objects and pure python functions inside Dask tasks.