I have monthly zarr files in s3 that have gridded temperature data. I would like to pull down multiple months of data for one lat/lon and create a dataframe of that time series. Some pseudo code:
datasets=[]
for file in files:
s3 = s3fs.S3FileSystem()
zarr_store = s3fs.S3Map(file, s3=s3)
zarr = xr.open_zarr(store=zarr_store, consolidated=True)
ds = zarr.sel(latitude=lat,
longitude=long,
time=slice(start_date.strftime("%Y-%m-%d"),
end_date.strftime("%Y-%m-%d"))
)
datasets.append(ds)
con = xr.concat(datasets, dim='time')
df = con.to_dataframe()
so this code will work, but is incredibly slow. I was hoping to use dask to speed this up. My plan was to change the method to process one file at a time and return a dataframe. I would then call client.map() and generate all the dfs, then concat them together at the end. So I wound up with something similar to this:
def load(file, lat: float, long: float, start_date, end_date):
s3 = s3fs.S3FileSystem()
s3_path = file['s3_bucket'] + '/' + file['zarr_s3_key']
zarr_store = s3fs.S3Map(s3_path, s3=s3)
zarr = xr.open_zarr(store=zarr_store, consolidated=True)
ds = zarr.sel(latitude=lat,
longitude=long,
time=slice(start_date.strftime("%Y-%m-%d"),
end_date.strftime("%Y-%m-%d"))
)
tmp = x.result().to_array().values
df_time = zarr.coords['time'].sel(time=slice(start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"))).values
df = pd.DataFrame({'time': df_time, 'lat': lat, 'long': long, 'dat': tmp})
df.set_index(['time', 'lat', 'long'], inplace=True)
return df
if __name__ == '__main__':
client = Client('tcp://xxx')
start_date = date(2000, 1, 7)
end_date = date(2000, 10, 20)
lat = 2
lon = 10
# get the s3 locations of the zarr files from the db
files = get_files()
# try just running with one file
res = client.submit(load, files[0], lat, lon, start_date, end_date)
# run them all
future = client.map(load, files,
repeat(lat), repeat(lon),
repeat(start_date), repeat(end_date))
x = client.gather(future)
This code runs fine when I connect client to just my local machine. But when I try to connect to a remote cluster I get the following error on the xr.open_zarr call:
KeyError: 'XXX/data.zarr/.zmetadata'
I tried changing up the code and loading the zarrs outside the method call and passing them in, but that only gave me nans as a result. Is there something I am missing? Is this not the correct way to solve what I'm trying to do?