1

I am quite new to parallel computing and Dask. In addition, this is my first question here in Stackoverflow and I hope everything will work.

Problem Description

I want to set up a bias correction of climate data (e.g. total precipitation, tp). For this I have three datasets:

  • Actual predictions (ds_pred), containing several different ensemble members for a period of 215 days from an initial date. (Dimension: ens x days x lat x lon)
  • Historical model data (ds_mdl_hist) for a given period of time (e.g. 1981 - 2016). (year x days x lat x lon)
  • Observational data (ds_obs). (year x days x lat x lon)

I created some dummy data, to illustrate my problem. Please note: My real datasets contain a lot of more data.

# create dummy dataframe
days          = np.arange(0,215)
lat           = np.arange(35, 50)
lon           = np.arange(10,20)
ens           = np.arange(0,10)
year          = np.arange(1981,2016)
data_pred     = np.random.randint(0, 100, size=(len(days), len(ens), len(lat), len(lon)))
data_mdl_hist = np.random.randint(0, 100, size=(len(year),len(days), len(ens), len(lat), len(lon)))
data_obs      = np.random.randint(0, 100, size=(len(year),len(days), len(lat), len(lon)))

# create dataset
ds_pred     = xr.Dataset({'tp': xr.DataArray(
                data   = data_pred,   # enter data here
                dims   = ['days', 'ens', 'lat', 'lon'],
                coords = {'days': days, 'ens': ens, 'lat': lat, 'lon': lon},
                attrs  = {'units'     : 'mm/day'})})

ds_mdl_hist = xr.Dataset({'tp': xr.DataArray(
                data   = data_mdl_hist,   # enter data here
                dims   = ['year', 'days','ens', 'lat', 'lon'],
                coords = {'year': year, 'days': days, 'ens': ens, 'lat': lat, 'lon': lon},
                attrs  = {'units'     : 'mm/day'})})


ds_obs      = xr.Dataset({'tp': xr.DataArray(
                data   = data_obs,   # enter data here
                dims   = ['year', 'days', 'lat', 'lon'],
                coords = {'year': year, 'days': days, 'lat': lat, 'lon': lon},
                attrs  = {'units'     : 'mm/day'})})

For each day in ds_pred, I slice the corresponding days in ds_mdl_hist and ds_obs over each year. Furthermore I do not select only the single day, but a 30-day-window around it. Example for day=20:

# Slice data corresponding to time step
k = 20

# Predictions
ds_pred_sub = ds_pred.isel(days=k)
# Pool 15 days before and after the time step k
window = 15
day_range = np.arange(k-window, k+window)
# Historical model
ds_mdl_hist_sub = ds_mdl_hist.isel(days=day_range)
# Observational
ds_obs_sub = ds_obs.isel(days=day_range)

In order to run apply_ufunc, I will stack some dimension.

# Stack dimension in order to use apply_u_func over only one dimension
ds_mdl_hist_sub = ds_mdl_hist_sub.stack(days_ens=("ens", "days", "year"))
ds_obs_sub = ds_obs_sub.stack(days_year=('days', 'year'))

My function for the bias correction includes the calculation of the distribution for both the subset of historical model data (ds_mdl_hist_sub) and observations (ds_obs_sub), as well as the interpolation with the subset of the predictions (ds_pred_sub). The function returns the corrected predictions (pred_corr).

def bc_module(pred, mdl_hist, obs):
    # Define number of grid points, at which quantiles are calculated
    p     = np.linspace(0, 1, 1000)
    # Calculate quantile of historical model
    q_mdl = np.quantile(mdl_hist, p, interpolation='midpoint')
    # Calculate quantile of observations
    q_obs = np.quantile(obs, p, interpolation='midpoint')
    
    
    # Do the bias correction
    Y_pred = interp1d(q_mdl,p)(pred)

    # Do the back-transformation to the data space
    pred_corr = interp1d(p, q_obs, bounds_error=False)(Y_pred)
    
    return pred_corr

Because my real datasets contains much more data, I want to apply the bc_modulein parallel by using apply_ufunc and dask=parallelized over my "time" dimension, by vectorizing over each lat, lon. This is my call:

# Xarray apply_ufunc
pred_corr = xr.apply_ufunc(bc_module, ds_pred_sub, ds_mdl_hist_sub, ds_obs_sub, input_core_dims=[['ens'], ['days_ens'], ['days_year']], output_core_dims=[['ens']], vectorize=True, dask="parallelized", output_dtypes=[np.float64]).compute()

This works fine, for me it seems that my code is not that fast.

Open Questions

  • I really do not know, how I can loop now over the 215 days (k) in an efficient way. A simple for loop is not sufficient, because I have different variables, which I have to correct and the run time of my real dataset is for one variable about 1 hour. Is it possible to use daskfor this as well? Maybe something like dask.delayed, because it is embarrassingly parallel.
  • In particular, I wonder if I can use dask to parallelize my outer forloop over the 215 days when apply_ufuncalready uses dask=parallelized.
  • Is there are more efficient way, then using apply_ufunc? I really want to avoid nested loops over first latand lonand second the 215 days.
  • Is it possible to add some commands to my apply_ufunc, so that I can include the looping over the 215 days?
  • 2
    Hi @clemens-borkenhagen, welcome to SO! In your actual problem, are all your inputs to the `apply_ufunc` function dask arrays? I suspect that all the stacking/slicing you before calling this function are causing performance problems. Also, you may be interested in some of the bias correction approaches in https://github.com/pangeo-data/scikit-downscale. There we chose to use Xarray's `map_blocks` function to manage parallelization. – jhamman Jul 25 '22 at 15:44
  • 1
    Hello @jhamman, thanks for your quick response. You are right, all of my arrays go as ```dask.array``` into the ```apply_ufunc```. Is there an opportunity of efficient slicing, when using ```dask```? – Clemens Borkenhagen Jul 26 '22 at 07:40
  • 1
    A lot depends on the chunk configuration in your Dask arrays. Since your bias correction method is done along the time dimension, you'll likely want to make the chunk size == len(time). Generally speaking though, slicing dask array's generates new chunks and can lead to large/complex task graphs. As I mentioned above, I've found `map_blocks` to be a better approach for these sorts of bias correction methods. – jhamman Jul 26 '22 at 18:58
  • 1
    Unfortunatelly, the problem still exists. It would be really nice, if this will work with `apply_u_func` and `dask`. Maybe this would work with a nice combination of selecting the right chunks and persist/load data?? – Clemens Borkenhagen Dec 14 '22 at 14:11

0 Answers0