3

I have monthly zarr files in s3 that have gridded temperature data. I would like to pull down multiple months of data for one lat/lon and create a dataframe of that time series. Some pseudo code:

datasets=[]
for file in files:
    s3 = s3fs.S3FileSystem()        
    zarr_store = s3fs.S3Map(file, s3=s3)
    zarr = xr.open_zarr(store=zarr_store, consolidated=True)
    ds = zarr.sel(latitude=lat,
                  longitude=long,
                  time=slice(start_date.strftime("%Y-%m-%d"),
                             end_date.strftime("%Y-%m-%d"))
                       )
    datasets.append(ds)

con = xr.concat(datasets, dim='time')
df = con.to_dataframe()

so this code will work, but is incredibly slow. I was hoping to use dask to speed this up. My plan was to change the method to process one file at a time and return a dataframe. I would then call client.map() and generate all the dfs, then concat them together at the end. So I wound up with something similar to this:

def load(file, lat: float, long: float, start_date, end_date):

    s3 = s3fs.S3FileSystem()
    s3_path = file['s3_bucket'] + '/' + file['zarr_s3_key']
    zarr_store = s3fs.S3Map(s3_path, s3=s3)
    zarr = xr.open_zarr(store=zarr_store, consolidated=True)

    ds = zarr.sel(latitude=lat,
                  longitude=long,
                  time=slice(start_date.strftime("%Y-%m-%d"),
                             end_date.strftime("%Y-%m-%d"))
                 )

    tmp = x.result().to_array().values
    df_time = zarr.coords['time'].sel(time=slice(start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"))).values
    df = pd.DataFrame({'time': df_time, 'lat': lat, 'long': long, 'dat': tmp})
    df.set_index(['time', 'lat', 'long'], inplace=True)

    return df

if __name__ == '__main__':
    client = Client('tcp://xxx')

    start_date = date(2000, 1, 7)
    end_date = date(2000, 10, 20)
    lat = 2
    lon = 10

    # get the s3 locations of the zarr files from the db
    files = get_files()

    # try just running with one file
    res = client.submit(load, files[0], lat, lon, start_date, end_date) 

    # run them all
    future = client.map(load, files,
                        repeat(lat), repeat(lon),
                        repeat(start_date), repeat(end_date))
    x = client.gather(future)

This code runs fine when I connect client to just my local machine. But when I try to connect to a remote cluster I get the following error on the xr.open_zarr call:

KeyError: 'XXX/data.zarr/.zmetadata'

I tried changing up the code and loading the zarrs outside the method call and passing them in, but that only gave me nans as a result. Is there something I am missing? Is this not the correct way to solve what I'm trying to do?

David
  • 181
  • 13
  • Are the S3 files public? If so, could you share the full link to the data? This might help us help you better. – Ryan Apr 17 '20 at 12:48

2 Answers2

8

If you want to just extract a time series at a point, you can just create a Dask client and then let xarray do the magic in parallel. In the example below we have just one zarr dataset, but as long as the workers stay busy processing the chunks in each Zarr file, you wouldn't gain anything from parsing the Zarr files in parallel.

import xarray as xr
import fsspec
import hvplot.xarray

from dask.distributed import Client

url = 's3://mur-sst/zarr'  # Amazon Public Data

ds = xr.open_zarr(fsspec.get_mapper(url, anon=True), consolidated=True)

timeseries = ds['analysed_sst'].sel(time=slice('2015-01-01','2020-01-01'),
                                    lat=43, 
                                    lon=-70).persist()

timeseries.hvplot()

produces:

enter image description here

Here is the Full Jupyter Notebook

Rich Signell
  • 14,842
  • 4
  • 49
  • 77
1

Was able to solve this problem, but will leave it up in case it helps anyone else in the future.

So it turned out to be a permissions issue. The workers did not have access to the S3 bucket, which is why I was getting the KeyError.

I'd still be open to hearing from people if this is the best way to bulk load/process zarrs though

David
  • 181
  • 13
  • 3
    In the master version of `fsspec`/`s3fs`, errors other than "not found" will no longer give you the `KeyError`, but will show you the underlying error. – mdurant Apr 17 '20 at 12:59