4

I have a 37 GB .npy file that I would like to convert to Zarr store so that I can include coordinate labels. I have code that does this in theory, but I keep running out of memory. I want to use Dask in-between to facilitate doing this in chunks, but I still keep running out of memory.

The data is "thickness maps" for people's femoral cartilage. Each map is a 310x310 float array, and there are 47789 of these maps. So the data shape is (47789, 310, 310).

Step 1: Load the npy file as a memmapped Dask array.

fem_dask = dask.array.from_array(np.load('/Volumes/T7/cartilagenpy20220602/femoral.npy', mmap_mode='r'),
                                 chunks=(300, -1, -1))

Step 2: Make an xarray DataArray over the Dask array, with the desired coordinates. I have several coordinates for the 'map' dimension that come from metadata (a pandas dataframe).

fem_xr = xr.DataArray(fem_dask, dims=['map','x','y'],
                         coords={'patient_id': ('map', metadata['patient_id']),
                                 'side':       ('map', metadata['side'].astype(np.string_)),
                                 'timepoint':  ('map', metadata['timepoint'])
                                })

enter image description here

Step 3: Write to Zarr.

fem_ds = fem_xr.to_dataset(name='femoral')  # Zarr requires Dataset, not DataArray
res = fem_ds.to_zarr('/Volumes/T7/femoral.zarr', 
                     encoding={'femoral': {'dtype': 'float32'}},
                     compute=False)
res.visualize()

See task graph below if desired enter image description here

When I call res.compute(), RAM use quickly climbs out of control. The other python processes, which I think are the Dask workers, seem to be inactive:

enter image description here

But a bit later, they are active -- see that one of those Python processes now has 20 gb RAM and another has 36 gb: enter image description here

Which we can also confirm from the Dask dashboard:

enter image description here

Eventually all the workers get killed and the task errors out. How can I do this in an efficient way that correctly uses Dask, xarray, and Zarr, without running out of RAM (or melting the laptop)?

thomaskeefe
  • 1,900
  • 18
  • 19
  • the really short answer is that you're creating a single reader into the numpy array, which forms a bottleneck for all of the downstream tasks. As far as dask knows, all tasks depend on loading the result of `np.load`. This is very different from how a distributed read job such as [`dask.array.from_npy_stack`](https://docs.dask.org/en/stable/generated/dask.array.from_npy_stack.html#dask.array.from_npy_stack) works. To get around that, check out [this answer](https://stackoverflow.com/a/53363533/3888719) - you want to create a distributed job where each worker opens the data themselves. – Michael Delgado Jun 17 '22 at 21:56
  • it seems the dask devs think this should work, but I see the same behavior as you on 2022.04.0. I'm raising this here: https://github.com/dask/dask/issues/4837 – Michael Delgado Jun 18 '22 at 02:35
  • Actually ended up posting here: better fit. https://github.com/dask/dask/issues/5985#issuecomment-1159347331 – Michael Delgado Jun 19 '22 at 19:07

1 Answers1

3

using threads

If the dask workers can share threads, your code should just work. If you don't initialize a dask Cluster explicitly, dask.Array will create one with default args, which use processes. This results in the behavior you're seeing. To solve this, explicitly create a cluster using threads:

# use threads, not processes
cluster = dask.distributed.LocalCluster(processes=False)
client = dask.distributed.Client(cluster)

arr = np.load('myarr.npy', mmap_mode='r')
da = dda.from_array(arr).rechunk(chunks=(100, 310, 310))
da.to_zarr('myarr.zarr', mode='w')

using processes or distributed workers

If you're using a cluster which cannot share threads, such as a JobQueue, KubernetesCluster, etc., you can use the following to read the npy file, assuming it's on a networked filesystem or is in some way available to all workers.

Here's a workflow that creates an empy array from the memory map, then maps the read job using dask.array.map_blocks. The key is the use of the block_info optional keyword, which gives information about the location of the block within the array, which we can use to slice new mmap array objects using dask workers:

def load_npy_chunk(da, fp, block_info=None, mmap_mode='r'):
    """Load a slice of the .npy array, making use of the block_info kwarg"""
    np_mmap = np.load(fp, mmap_mode=mmap_mode)
    array_location = block_info[0]['array-location']
    dim_slicer = tuple(list(map(lambda x: slice(*x), array_location)))
    return np_mmap[dim_slicer]

def dask_read_npy(fp, chunks=None, mmap_mode='r'):
    """Read metadata by opening the mmap, then send the read job to workers"""
    np_mmap = np.load(fp, mmap_mode=mmap_mode)
    da = dda.empty_like(np_mmap, chunks=chunks)
    return da.map_blocks(load_npy_chunk, fp=fp, mmap_mode=mmap_mode, meta=da)

This works for me on a demo of the same size (you could add the xarray.DataArray creation/formatting step at the end, but the dask ops work fine and worker memory stays below 1GB for me):

import numpy as np, dask.array as dda, xarray as xr, pandas as pd, dask.distributed

### insert/import above functions here

# save a large numpy array
np.save('myarr.npy', np.empty(shape=(47789, 310, 310), dtype=np.float32))

cluster = dask.distributed.LocalCluster()
client = dask.distributed.Client(cluster)

da = dask_read_npy('myarr.npy', chunks=(300, -1, -1), mmap_mode='r')
da.to_zarr('myarr.zarr', mode='w')
Michael Delgado
  • 13,789
  • 3
  • 29
  • 54
  • 1
    Simply switching from procs to threads on my LocalCluster didn't seem to help. In fact, I tried your MRE with dummy data and it still just got stuck on the .to_zarr(...). However, the workflow with map_blocks worked great. – thomaskeefe Jun 18 '22 at 15:12
  • Poking around in the dask issues it does seem there’s been recent work on this so it’s possible you need to update dask for the first one to work. But the second one seems like a great option too so glad it helped! – Michael Delgado Jun 18 '22 at 16:08