I am quite new to parallel computing and Dask. In addition, this is my first question here in Stackoverflow and I hope everything will work.
Problem Description
I want to set up a bias correction of climate data (e.g. total precipitation, tp). For this I have three datasets:
- Actual predictions (
ds_pred
), containing several different ensemble members for a period of 215 days from an initial date. (Dimension:ens
xdays
xlat
xlon
) - Historical model data (
ds_mdl_hist
) for a given period of time (e.g. 1981 - 2016). (year
xdays
xlat
xlon
) - Observational data (
ds_obs
). (year
xdays
xlat
xlon
)
I created some dummy data, to illustrate my problem. Please note: My real datasets contain a lot of more data.
# create dummy dataframe
days = np.arange(0,215)
lat = np.arange(35, 50)
lon = np.arange(10,20)
ens = np.arange(0,10)
year = np.arange(1981,2016)
data_pred = np.random.randint(0, 100, size=(len(days), len(ens), len(lat), len(lon)))
data_mdl_hist = np.random.randint(0, 100, size=(len(year),len(days), len(ens), len(lat), len(lon)))
data_obs = np.random.randint(0, 100, size=(len(year),len(days), len(lat), len(lon)))
# create dataset
ds_pred = xr.Dataset({'tp': xr.DataArray(
data = data_pred, # enter data here
dims = ['days', 'ens', 'lat', 'lon'],
coords = {'days': days, 'ens': ens, 'lat': lat, 'lon': lon},
attrs = {'units' : 'mm/day'})})
ds_mdl_hist = xr.Dataset({'tp': xr.DataArray(
data = data_mdl_hist, # enter data here
dims = ['year', 'days','ens', 'lat', 'lon'],
coords = {'year': year, 'days': days, 'ens': ens, 'lat': lat, 'lon': lon},
attrs = {'units' : 'mm/day'})})
ds_obs = xr.Dataset({'tp': xr.DataArray(
data = data_obs, # enter data here
dims = ['year', 'days', 'lat', 'lon'],
coords = {'year': year, 'days': days, 'lat': lat, 'lon': lon},
attrs = {'units' : 'mm/day'})})
For each day in ds_pred
, I slice the corresponding days in ds_mdl_hist
and ds_obs
over each year. Furthermore I do not select only the single day, but a 30-day-window around it. Example for day=20:
# Slice data corresponding to time step
k = 20
# Predictions
ds_pred_sub = ds_pred.isel(days=k)
# Pool 15 days before and after the time step k
window = 15
day_range = np.arange(k-window, k+window)
# Historical model
ds_mdl_hist_sub = ds_mdl_hist.isel(days=day_range)
# Observational
ds_obs_sub = ds_obs.isel(days=day_range)
In order to run apply_ufunc
, I will stack some dimension.
# Stack dimension in order to use apply_u_func over only one dimension
ds_mdl_hist_sub = ds_mdl_hist_sub.stack(days_ens=("ens", "days", "year"))
ds_obs_sub = ds_obs_sub.stack(days_year=('days', 'year'))
My function for the bias correction includes the calculation of the distribution for both the subset of historical model data (ds_mdl_hist_sub
) and observations (ds_obs_sub
), as well as the interpolation with the subset of the predictions (ds_pred_sub
). The function returns the corrected predictions (pred_corr
).
def bc_module(pred, mdl_hist, obs):
# Define number of grid points, at which quantiles are calculated
p = np.linspace(0, 1, 1000)
# Calculate quantile of historical model
q_mdl = np.quantile(mdl_hist, p, interpolation='midpoint')
# Calculate quantile of observations
q_obs = np.quantile(obs, p, interpolation='midpoint')
# Do the bias correction
Y_pred = interp1d(q_mdl,p)(pred)
# Do the back-transformation to the data space
pred_corr = interp1d(p, q_obs, bounds_error=False)(Y_pred)
return pred_corr
Because my real datasets contains much more data, I want to apply the bc_module
in parallel by using apply_ufunc
and dask=parallelized
over my "time" dimension, by vectorizing over each lat
, lon
. This is my call:
# Xarray apply_ufunc
pred_corr = xr.apply_ufunc(bc_module, ds_pred_sub, ds_mdl_hist_sub, ds_obs_sub, input_core_dims=[['ens'], ['days_ens'], ['days_year']], output_core_dims=[['ens']], vectorize=True, dask="parallelized", output_dtypes=[np.float64]).compute()
This works fine, for me it seems that my code is not that fast.
Open Questions
- I really do not know, how I can loop now over the 215 days (
k
) in an efficient way. A simplefor
loop is not sufficient, because I have different variables, which I have to correct and the run time of my real dataset is for one variable about 1 hour. Is it possible to usedask
for this as well? Maybe something likedask.delayed
, because it isembarrassingly parallel
. - In particular, I wonder if I can use
dask
to parallelize my outerfor
loop over the 215 days whenapply_ufunc
already usesdask=parallelized
. - Is there are more efficient way, then using
apply_ufunc
? I really want to avoid nested loops over firstlat
andlon
and second the 215 days. - Is it possible to add some commands to my
apply_ufunc
, so that I can include the looping over the 215 days?