6

Suppose I have a directory with thousands of GRIB files. I want to load those files into a dask array so I can query them. How can I go about doing this? The attempt below seems to work, but it requires each GRIB file to be opened, and it takes a long time to run and all of my memory. There must be a better way.

My attempt:

import dask.array as da
from dask import delayed
import gdal
import glob
import os


def load(filedir):
    files = sorted(glob.glob(os.path.join(filedir, '*.grb')))
    data = [da.from_array(gdal.Open(f).ReadAsArray(), chunks=[500,500,500], name=f) for f in files]
    return da.stack(data, axis=0)

file_dir = ...
array = load(file_dir)
  • Have you tried xarray, which is built on dask and has GRIB support via PyNIO now avaiable? But, in case you want to analyse the data fast, you should rewrite it to hdf5 or NetCDF4 and with proper chunking any further analysis is going to be very easy. – kakk11 May 11 '17 at 13:42

1 Answers1

6

The best way to do this would be to use dask.delayed. In this case, you'd create a delayed function to read the array, and then compose a dask array from those delayed objects using the da.from_delayed function. Something along the lines of:

# This function isn't run until compute time
@dask.delayed(pure=True)
def load(file):
    return gdal.Open(file).ReadAsArray()


# Create several delayed objects, then turn each into a dask
# array. Note that you need to know the shape and dtype of each
# file
data = [da.from_delayed(load(f), shape=shape_of_f, dtype=dtype_of_f)
        for f in files]

x = da.stack(data, axis=0)

Note that this makes a single task for loading each file. If the individual files are large, you may want to chunk them yourself in the load function. I'm not familiar with gdal, but from a brief look at the ReadAsArray method this may be doable with the xoff/yoff/xsize/ysize parameters (not sure). You'd have to write this code yourself, but it may be more performant for large files.

Alternatively you could use the code above, and then call rechunk to rechunk into smaller chunks. This would still result in reading in each file in a single task, but subsequent steps could work with smaller chunks. Whether this is worth it or not depends on the size of your individual files.

x = x.rechunk((500, 500, 500))  # or whatever chunks you want
jiminy_crist
  • 2,395
  • 2
  • 17
  • 23
  • the code works when I ran it for 60 files. I get a memory error when I try to run it over 17,500 files. Each file has dimensions (52, 224, 464). After stacking them I rechunked to a chunk size of (1, 1, 224, 464). All I do to x is `x[:, :, 80, 80].compute()`. Another important detail is that supplying a cache to spill to disk did not prevent the memory error. – Philip Blankenau May 09 '17 at 04:04
  • In this case rechunking doesn't really make sense, as you're just subselecting out a subset afterwards. In general you want to keep your chunks large enough that the cost of individual tasks is much larger than the overhead of the scheduler. For more info see http://dask.pydata.org/en/latest/array-creation.html#chunks. A decent rule of thumb is at least 1e6 elements in each chunk, and usually more than that. I wouldn't rechunk at all, and just have a chunk per file. – jiminy_crist May 09 '17 at 04:34
  • 1
    When rechunking, also inspect the blocksize (chunksize) of the file. Respecting the native blocksize (or a multiple) can really boost performance. It prevents reading the same data from disk over and over. GDAL's blockcache can also help preventing that. – Rutger Kassies May 10 '17 at 09:23
  • 1
    @PhilipBlankenau, here is a (dramatic) example reading a GRIB file in the correct and incorrect order. Over 100x difference, without caching, just to illustrate the potential impact: http://nbviewer.jupyter.org/gist/RutgerK/4faa43873ee6389873dd4de85de7c450 – Rutger Kassies May 10 '17 at 09:52