What I've tried
I have an embarrassingly parallel for loop in which I iterate over 90x360 values in two nested for loops and do some computation. I tried dask.delayed
to parallelize the for loops as per this tutorial although it is demonstrated for a very small set of iterations.
Problem description
I'm surprised to find that the parallel code took 2h 39 mins compared to non-parallel timing of 1h 54 mins which means that I'm doing something fundamentally wrong or maybe the task graphs are too big to handle?
Set-up info
This test was done for a subset of my iterations that is, 10 x 360, but the optimized code should be able to handle 90 x 360 nested iterations. My mini-cluster has 66 cores and 256 GB of RAM and the 2 data files are 4 GB and < 1 GB each. I'm also confused between the approach of multi-processing
vs multi-threading
for this task. I thought running parallel loops in multiple processes similar to joblib
default implementation would be the way to go as each loop works on independent grid-points. But, this suggests that multi-threading
is faster and should be preferred if one doesn't have a GIL issue (which I don't). So, for the timing above, I used dask.delay
default scheduling option which uses multi-threading option for a single process.
Simplified code
import numpy as np
import pandas as pd
import xarray as xr
from datetime import datetime
from dask import compute, delayed
def add_data_from_small_file(lat):
""" for each grid-point, get time steps from big-file as per mask, and
compute data from small file for those time-steps
Returns: array per latitude which is to be stacked
"""
for lon in range(0,360):
# get time steps from big file
start_time = big_file.time.values[mask1[:, la, lo]]
end_time = big_file.time.values[[mask2[:,la,lo]]
i=0
for t1, t2 in zip(start_time, end_time):
# calculate value from small file for each time pair
temp_var[i] = small_file.sel(t=slice(t1, t2)).median()
i=i+1
temp_per_lon[:, lon] = temp_var
return temp_per_lon
if __name__ == '__main__':
t1 = datetime.now()
small_file = xr.open_dataarray('small_file.nc') # size < 1 GB, 10000x91
big_file = xr.open_dataset('big_file.nc') # size = 4 GB, 10000x91x360
delayed_values = [delayed(add_data_from_small_file)(lat) for lat in range(0,10)] # 10 loops for testing, to scale to 90 loops
# have to delay stacking to avoid memory error
stack_arr = delayed(np.stack)(delayed_values, axis=1)
stack_arr = stack_arr.compute()
print('Total run time:{}'.format(datetime.now()-t1))