10

Intro

I am parallelising some code using dask.distributed (embarrassingly parallel task).

  • I have a list of Paths pointing to different images that I scatter to workers.
  • Each worker loads and filters an image (3D stack) and run some filtering. 3D filtering with scipy saves intermediates outputs.
  • Each filtered image is saved as npy and/or png on disk.
  • I am testing locally before running on a cluster and my setup is:

.

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=2, threads_per_worker=1,memory_limit =8e9)
client = Client(cluster)

Issue:

  • When I process only two images (1 image/worker) everything is fine
  • When I scatter more than one image per worker I get this warnings in which the process memory value increases.

.

distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.
Perhaps some other process is leaking memory?  Process memory: 6.21 GB -- Worker memory limit: 8.00 GB

suggesting that part of the RAM used by the worker is non freed between the different files (I guess are leftover filtering intermediates....)

Question

Is there a way to free the memory of the worker before starting the processing of the next image? do I have to run a garbage collector cycle in between running tasks?

edit

I included gc.collect() call at the end of the function run by the worker but didn't eliminate the warnings.

Thanks a lot for the help!

mdurant
  • 27,272
  • 5
  • 45
  • 74
s1mc0d3
  • 523
  • 2
  • 15

2 Answers2

1

As long as the reference count for a distributed value is held by a client the cluster won't purge it from memory. This is expounded on in the Managing Memory documentation, specifically the "Clearing data" section.

joebeeson
  • 4,159
  • 1
  • 22
  • 29
1

The "Memory use is high" error message could be pointing to a few potential culprits. I found this article by one of the core Dask maintainers helpful in diagnosing and fixing the issue.

Quick summary, either:

  1. Break your data into smaller chunks.
  2. Manually trigger garbage collection and/or tweak the gc settings on the workers through a Worker Plugin (which op has tried but doesn't work; I'll include anyway for other readers)
  3. Trim memory using malloc_trim (esp. if working with non-NumPy data or small NumPy chunks)

Make sure you can see the Dask Dashboard while your computations are running to figure out which approach is working.

rrpelgrim
  • 342
  • 2
  • 13