Just starting out on my Python journey. The ideas below are not my own. All the credit goes to David S. Batista https://www.davidsbatista.net/blog/2018/02/23/model_optimization/
who modified code by Panagiotis Katsaroumpas and shared it.
What I have done is modify David's code a bit by adding a user defined score and a preprocessing step that includes data imputation and scaling prior to model estimation. So here goes:
# import the libraries
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
Imputer = IterativeImputer(max_iter=10, random_state=15) # I used a custom wrapper
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import IsolationForest
from inne import IsolationNNE
## create the scoring for the models and save it to file
## in current working directory as 'scorers.py'
# def scorer_decision(estimator, X):
# return np.nanmean(estimator.decision_function(X))
## import decision function score saved as .py file
## into working directory of project
from scorers import scorer_decision
class EstimatorSelectionHelper:
def __init__(self, models, params):
if not set(models.keys()).issubset(set(params.keys())):
missing_params = list(set(models.keys()) - set(params.keys()))
raise ValueError("Some estimators are missing parameters: {}".format(missing_params))
self.models = models
self.params = params
self.keys = models.keys()
self.grid_searches = {}
def fit(self, X, y=None, cv=5, n_jobs=3, verbose=1, scoring=scorer_decision, refit=True):
for key in self.keys:
print("Running GridSearchCV for %s." % key)
model = self.models[key]
params = self.params[key]
gs = GridSearchCV(estimator=model,
param_grid=params,
cv=cv,
n_jobs=n_jobs,
verbose=verbose,
scoring=scoring,
refit=refit,
return_train_score=True)
gs.fit(X,y=None)
self.grid_searches[key] = gs
def score_summary(self, sort_by='mean_score'):
def row(key, scores, params):
d = {
'estimator': key,
'min_score': min(scores),
'max_score': max(scores),
'mean_score': np.mean(scores),
'std_score': np.std(scores),
}
return pd.Series({**params,**d})
rows = []
for k in self.grid_searches:
print(k)
params = self.grid_searches[k].cv_results_['params']
scores = []
for i in range(self.grid_searches[k].cv):
key = "split{}_test_score".format(i)
r = self.grid_searches[k].cv_results_[key]
scores.append(r.reshape(len(params),1))
all_scores = np.hstack(scores)
for p, s in zip(params, all_scores):
rows.append((row(k, s, p)))
df = pd.concat(rows, axis=1).T.sort_values([sort_by], ascending=False)
columns = ['estimator', 'min_score', 'mean_score', 'max_score', 'std_score']
columns = columns + [c for c in df.columns if c not in columns]
return df[columns]
# list of numeric features to impute
numeric_columns = list(Xtrain.select_dtypes(include = 'number').columns)
# pipeline for processing numerical feeatures
numeric_transformer = Pipeline([
('imputer', Imputer()),
('scaler', StandardScaler())
])
# column transformer
column_transformer = ColumnTransformer([
('numeric_pipeline', numeric_transformer, numeric_columns)
])
# grid search parameters for models
num_estimators = np.linspace(100, 200, num = 5, endpoint = True).astype(int)
max_samples = np.linspace(0.70, 1.00, num = 5)
contamination = np.linspace(0.04, 0.10, num = 5, endpoint = True)
max_features = np.arange(start = 1, stop = Xdata.shape[1]+1, step = 1)
# estimators to use
models1 = {
'iforest': IsolationForest(n_jobs = -1, random_state = 3),
'iNNE': IsolationNNE(random_state = 3)
}
# parameters
params1 = {
# isolation forest grid parameters
'iforest': {
'n_estimators': num_estimators,
'max_samples': max_samples,
'contamination': contamination,
'max_features': max_features,
'bootstrap': [False]
},
# inne grid parameters
'iNNE': {
'n_estimators': num_estimators,
'max_samples': max_samples,
'contamination': contamination
}
}
## run the models
# create EstimatorSelectionHelper by passing models and parameters
estimators = EstimatorSelectionHelper(models1, params1)
# create pipeline
pipe = Pipeline([
('ct', column_transformer),
('models', estimators)
])
pipe.fit(Xdata)
## get summary output
output = pipe.named_steps.models.score_summary(sort_by = 'max_score')
output.head()