2

I might misunderstand how Dasks submit() function is working. If I'm submitting a function of my class that is initializing a parameter it is not working.

Question: What is the correct way to submit a class to a dask-cluster using .submit()?

So, I have a class to run several different ML-models on a data set:

class EstimatorSelectionHelper:
    
    def __init__(self, models, params):
        self.models = models
        self.params = params
        self.keys = models.keys()
        self.grid_searches = {}
    
    def fit(self, X, y, algo = 'gridsearchcv', **kwargs):
    
        if algo == 'gridsearchcv':
        
            for key in self.keys:
                print('Running GridSearchCV for %s.' % key)
                model = self.models[key]
                params = self.params[key]
                grid_search = GridSearchCV(model, params, **kwargs)
                grid_search.fit(X, y)
                self.grid_searches[key] = grid_search
            print('Done.')
        
        elif algo == 'randomizedsearchcv':
    
            for key in self.keys:
                print('Running RandomizedSearchCV for %s.' % key)
                model = self.models[key]
                params = self.params[key]
                grid_search = dcv.RandomizedSearchCV(model, params, **kwargs)
                grid_search.fit(X, y)
                self.grid_searches[key] = grid_search
            print('Done.')
            
        else:
            print('None defined alogorithm.')
    
    def score_summary(self, sort_by='mean_test_score'):
        frames = []
        for name, grid_search in self.grid_searches.items():
            
            frame = pd.DataFrame(grid_search.cv_results_)
            frame = frame.filter(regex='^(?!.*param_).*$')
            frame['estimator'] = len(frame)*[name]
            frames.append(frame)
        df = pd.concat(frames)
        
        df = df.sort_values([sort_by], ascending=False)
        df = df.reset_index()
        df.drop(df.columns[df.columns.str.contains('rank_test_|index')], axis=1, inplace=True)
        
        columns = df.columns.tolist()
        columns.remove('estimator')
        columns = ['estimator']+columns
        df = df[columns]
        return df

I'm initializing the class:

helper = EstimatorSelectionHelper(models, params)

Where models and params look like:

models = { 
    'LinearRegression': LinearRegression(),
    'Lasso': Lasso(),
    }

params = { 
    'LinearRegression': {},
    'Lasso': {'max_iter': [5000],
              'alpha': uniform(0.01, 10)},
     }

Then I submit the function .fit() to the cluster:

client.submit(helper.fit,
                future_G, 
                future_y, 
                algo='randomizedsearchcv', 
                n_iter= 10, 
                scoring=['r2','neg_root_mean_squared_error'], 
                refit=test_score, n_jobs=4, cv=cv).result() 

When I then call

results = helper.score_summary('mean_test_r2')

I get an error:

ValueError: No objects to concatenate

and helper.grid_searches is empty.

I'm not sure what I'm doing wrong? Any help is appreciated. Thanks!

Christine
  • 53
  • 8

1 Answers1

3

You are facing a fundamental misunderstanding of how dask works. When you call submit, your instance is serialised and sent to a worker; on that worker, the instance is recreated, and the method run. The effect of running the method is to mutate the state of the instance - but this is not the original instance in your client process, but the copy held by the worker.

In short, tasks in dask are functional: you should return the output, rather than relying on mutation. What you are doing is not, in fact, stateful and there is no need for the encapsulation in a class:

def fit(...):
    grid_search = dcv.RandomizedSearchCV(model, params, **kwargs)
    grid_search.fit(X, y)
    return grid_search

If you insist on using the class, you can maybe make it work by returning self in your method. However, be warned: dask uses a hash of a result to decide whether it is a different value or not, so you would probably need to implement a dask-hash method for it to work.

Note that the pattern you are trying to use fits the Actors model, but I think it's much easier to make it into functions than to actually use an Actor.

mdurant
  • 27,272
  • 5
  • 45
  • 74
  • After a bit of back and forth, I've now decided to use a function instead of the class. It seems to work. Thanks! – Christine May 29 '22 at 23:04