I might misunderstand how Dasks submit()
function is working. If I'm submitting a function of my class that is initializing a parameter it is not working.
Question: What is the correct way to submit a class to a dask-cluster using .submit()
?
So, I have a class to run several different ML-models on a data set:
class EstimatorSelectionHelper:
def __init__(self, models, params):
self.models = models
self.params = params
self.keys = models.keys()
self.grid_searches = {}
def fit(self, X, y, algo = 'gridsearchcv', **kwargs):
if algo == 'gridsearchcv':
for key in self.keys:
print('Running GridSearchCV for %s.' % key)
model = self.models[key]
params = self.params[key]
grid_search = GridSearchCV(model, params, **kwargs)
grid_search.fit(X, y)
self.grid_searches[key] = grid_search
print('Done.')
elif algo == 'randomizedsearchcv':
for key in self.keys:
print('Running RandomizedSearchCV for %s.' % key)
model = self.models[key]
params = self.params[key]
grid_search = dcv.RandomizedSearchCV(model, params, **kwargs)
grid_search.fit(X, y)
self.grid_searches[key] = grid_search
print('Done.')
else:
print('None defined alogorithm.')
def score_summary(self, sort_by='mean_test_score'):
frames = []
for name, grid_search in self.grid_searches.items():
frame = pd.DataFrame(grid_search.cv_results_)
frame = frame.filter(regex='^(?!.*param_).*$')
frame['estimator'] = len(frame)*[name]
frames.append(frame)
df = pd.concat(frames)
df = df.sort_values([sort_by], ascending=False)
df = df.reset_index()
df.drop(df.columns[df.columns.str.contains('rank_test_|index')], axis=1, inplace=True)
columns = df.columns.tolist()
columns.remove('estimator')
columns = ['estimator']+columns
df = df[columns]
return df
I'm initializing the class:
helper = EstimatorSelectionHelper(models, params)
Where models and params look like:
models = {
'LinearRegression': LinearRegression(),
'Lasso': Lasso(),
}
params = {
'LinearRegression': {},
'Lasso': {'max_iter': [5000],
'alpha': uniform(0.01, 10)},
}
Then I submit the function .fit()
to the cluster:
client.submit(helper.fit,
future_G,
future_y,
algo='randomizedsearchcv',
n_iter= 10,
scoring=['r2','neg_root_mean_squared_error'],
refit=test_score, n_jobs=4, cv=cv).result()
When I then call
results = helper.score_summary('mean_test_r2')
I get an error:
ValueError: No objects to concatenate
and helper.grid_searches
is empty.
I'm not sure what I'm doing wrong? Any help is appreciated. Thanks!