5

I have a long list of .zarr arrays, that I would like to merge into a single array and write to disk.

My code approximately looks as follows:

import dask.array
import zarr
import os

local_paths = ['parts/X_00000000.zarr',
 'parts/X_00000001.zarr',
 'parts/X_00000002.zarr',
 'parts/X_00000003.zarr',
 'parts/X_00000004.zarr',
 'parts/X_00000005.zarr',
 ...]

result_path = "testtest"
os.makedirs(result_path)

Xs = [dask.array.from_zarr(zarr.DirectoryStore(p)) for p in local_paths]
X = dask.array.concatenate(Xs, axis=0)
X = X.rechunk({0: 10000, 1:-1, 2:-1, 3:-1})
dask.array.to_zarr(X, zarr.DirectoryStore(result_path))

Each of the arrays from local_paths contains a list of 64x64 images. These lists are all of different lengths. So the shape of the first might be (100, 64, 64, 3), the shape of the second might be (200, 64, 64, 3).

Executing the last line of this code, causes my memory to entirely consumed, and then the Jupyter notebook crashes entirely (without giving me an error message or an exception).

In order to investigate the problem, I printed the task graph and therefore replaced the last line with the following two lines:

k = dask.array.to_zarr(X, zarr.DirectoryStore(result_path), compute=False)
k.visualize()

It is very huge (link), so I screenshoted only two interesting pieces of it: enter image description here

This structure repeats all the time. Dask takes the output of the concatination, redistributes the data then tries to store it. Notice the thick black bar that is the result of overlapping transitions.

Now look where these transitions come from:

enter image description here

Look at the create node in the middle. I assume that this is the part in the graph there the zarr DirectoryStore is created. The predecessor of the create node has a transition to all store nodes!

Here is my guess why dask runs out of memory. It tries to resolve all the rechunking-merging first, and by the time it should create the DirectoryStore, there is no memory left. And none of the store nodes can be executed, because the create node is a precondition to each of them.

Is my assumtion true? If yes, what can I do to force dask to create the DirectoryStore first? If not, what else could be the problem that I am running out of memory?

UPDATE When I am using dask.config.set(scheduler='single-threaded') the creation of the DirectoryStore (create node) is not the problem. I just looked at the output directory and there are some files already written. So it has to be the task graph itself, that is too large.

enter image description here

r0f1
  • 2,717
  • 3
  • 26
  • 39
  • 2
    You might be interested in the extended conversation in https://github.com/dask/distributed/issues/2602#issuecomment-871801726 – Josh Jul 09 '21 at 07:45
  • is there any particular reason why you want to use `dask` to concatenate your `zarr`arrays ? because in the end you use `dask`only for concatenating your arrays. In the documentation it says `zarr` has `append`function. Maybe you can try [append](https://zarr.readthedocs.io/en/stable/tutorial.html#resizing-and-appending) by keeping it a pure `zarr` solution. – Siddhant Tandon Jul 10 '21 at 18:23
  • I am not an expert, but looking at `dask` 's documentation [here](https://docs.dask.org/en/latest/array-api.html#dask.array.to_zarr), it seems that the default value for `compute=True`. Try making it `false` to parse the command yet choose another way of computation later on if needed. – Bilal Qandeel Jul 13 '21 at 01:00
  • Late to the party, but does it help if you specify `chunks=None` in [`dask.array.from_zarr`](https://docs.dask.org/en/stable/generated/dask.array.from_zarr.html)? In my experience zarr’s defaults create a huge task multiplier by using much too small chunks which can explode your scheduler if inside a larger dask workflow. I’m not as familiar with the dask array version but this is certainly a problem when reading zarr arrays to xarray within dask tasks. – Michael Delgado May 26 '22 at 15:24

0 Answers0