13

I'm trying to rechunk a NetCDF file collection and create a Zarr dataset on AWS S3. I have 168 original NetCDF4 classic files with arrays of dimension time: 1, y: 3840, x: 4608 chunked as chunks={'time':1, 'y':768, 'x':922}.

I want to write this output to Zarr, and I want to optimize for time series extraction, so include more time records in my chunks. I figured I would use xarray to help get the job done, since I have a lot of processors available to take advantage of Dask, and xarray has both xr.open_mfdataset and ds.to_zarr.

I first tried rechunking to chunks={'time':24, 'y':768, 'x':922} to match the input NetCDF4 chunking in x and y, but when I tried to write to Zarr it complained because it needed uniform chunk sizes in both x and y, only allowing non-uniform size in the last chunk along the time dimension (and unfortunately in the x dimension, the total size 4608 is not a multiple of chunk size 922.

So then I tried chunks={'time':168, 'y':384, 'x':288} and that started working, and proceeded very quickly for a few minutes, then got slower and slower. Eventually after 50 minutes, the cluster died with:

4072 distributed.core - INFO - Event loop was unresponsive in Worker for 1.41s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
4073 slurmstepd: error: Step 3294889.0 exceeded memory limit (25346188 > 25165824), being killed

Here's the code I'm using:

from dask.distributed import Client

import pandas as pd
import xarray as xr
import s3fs
import zarr

client = Client(scheduler_file='/home/rsignell/scheduler.json')
client

enter image description here

root = '/lustre/projects/hazards/cmgp/woodshole/rsignell/nwm/forcing_short_range/' 

bucket_endpoint='https://s3.us-west-1.amazonaws.com/'

f_zarr = 'rsignell/nwm/test_week4'

dates = pd.date_range(start='2018-04-01T00:00', end='2018-04-07T23:00', freq='H')

urls = ['{}{}/nwm.t{}z.short_range.forcing.f001.conus.nc'.format(root,a.strftime('%Y%m%d'),a.strftime('%H')) for a in dates]

ds = xr.open_mfdataset(urls, concat_dim='time', chunks={'time':1, 'y':768, 'x':922})
ds = ds.drop(['ProjectionCoordinateSystem','time_bounds'])
ds = ds.chunk(chunks={'time':168, 'y':384, 'x':288}).persist()
ds

producing

<xarray.Dataset>
Dimensions:         (reference_time: 168, time: 168, x: 4608, y: 3840)
Coordinates:
  * reference_time  (reference_time) datetime64[ns] 2018-04-01 ...
  * x               (x) float64 -2.304e+06 -2.303e+06 -2.302e+06 -2.301e+06 ...
  * y               (y) float64 -1.92e+06 -1.919e+06 -1.918e+06 -1.917e+06 ...
  * time            (time) datetime64[ns] 2018-04-01T01:00:00 ...
Data variables:
    T2D             (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
    LWDOWN          (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
    Q2D             (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
    U2D             (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
    V2D             (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
    PSFC            (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
    RAINRATE        (time, y, x) float32 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
    SWDOWN          (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>

Then I call

fs = s3fs.S3FileSystem(anon=False, client_kwargs=dict(endpoint_url=bucket_endpoint))
d = s3fs.S3Map(f_zarr, s3=fs)

compressor = zarr.Blosc(cname='zstd', clevel=3, shuffle=2)
encoding = {vname: {'compressor': compressor} for vname in ds.data_vars}

delayed_store = ds.to_zarr(store=d, mode='w', encoding=encoding, compute=False)
persist_store = delayed_store.persist(retries=100)

and just before it dies, the Dask Daskboard looks like this:

enter image description here enter image description here

The total size of the NetCDF4 files is 20GB, so it seems a bit crazy that I've got over 500GB showing in the Dask Dashboard, and that 30 processors each with 60GB RAM is not sufficient for the job.

What am I doing wrong, or what would be a better solution?

Rich Signell
  • 14,842
  • 4
  • 49
  • 77
  • What is your memory usage after the completion of the first dataset `.persist()` ? – mdurant Apr 20 '18 at 14:00
  • Can you try persisting the full dataset prior to the `to_zarr` step? – jhamman Apr 20 '18 at 19:26
  • I do use `.persist()` on the dataset load above, and actually, this is where it crashes with `slurmstepd: error: Step 3295169.0 exceeded memory limit (126726200 > 125829120), being killed slurmstepd: error: *** STEP 3295169.0 ON n3-86 CANCELLED AT 2018-04-23T10:33:50 *** slurmstepd: error: Exceeded job memory limit` – Rich Signell Apr 23 '18 at 17:04

1 Answers1

1

I notice that you say you want to increase the number of chunks in the time dimension. Or maybe I've misunderstood.

You start with chunks specified as chunks={'time':1, 'y':768, 'x':922}, but then try chunks={'time':168, 'y':384, 'x':288} and find that the second one uses a large amount of memory.

The problem is that the chunks keyword specifies the size of the chunks, not the number of chunks!

In the first case the size of each chunk is 1*768*922 ~ 7e5, whereas in the second case the size of each chunk is 168*384*288 ~ 2e7.

The maximum number of chunks in time is achieved by chunks={'time': 1}.

Charles
  • 97
  • 5