I'm trying to port over some "parallel" Python code to Azure Databricks. The code runs perfectly fine locally, but somehow doesn't on Azure Databricks. The code leverages the multiprocessing
library, and more specifically the starmap
function.
The code goes like this:
from sklearn import metrics
import lightgbm as lgb
import numpy as np
def init_pool():
from threading import current_thread
ident = current_thread().ident
np.random.seed(ident)
def train_model(params, Xt, yt, Xv, yv):
model = lgb.LGBMClassifier(objective='binary', subsample=0.8, random_state=123, **params)
model.fit(Xt, yt)
proba = model.predict_proba(Xv)[:, 1]
return metrics.roc_auc_score(yv, proba)
if __name__ == "__main__":
from sklearn.model_selection import train_test_split
from itertools import product, repeat
import multiprocessing as mp
from time import time
import pandas as pd
def generate_data(n):
'''Generates random data'''
df = pd.DataFrame({
'x1': np.random.random(n) * 100,
'x2': np.random.choice(['a', 'b', 'c', 'd'], n, replace=True),
'x3': np.random.choice(['cow', 'platypus', 'koala', 'panda', 'camel'], n, replace=True),
'x4': np.random.poisson(15, n),
'y': np.random.choice([0, 1], n, replace=True, p=[0.8, 0.2])
})
# Necessary steps for lightgbm
for _ in df.columns:
if df[_].dtypes == 'object':
df[_] = df[_].astype('category')
X, y = df.drop(['y'], axis=1), df['y']
return train_test_split(X, y, test_size=0.3, stratify=y)
def grid_to_list(grid):
'''Parameter grid is converted to a list of all combinations'''
keys, values = zip(*grid.items())
return [dict(zip(keys, v)) for v in product(*values)]
param_list = grid_to_list({
'num_leaves': [20, 30, 40],
'learning_rate': [0.1, 0.3],
'n_estimators': [50, 100, 250]
})
n = 100_000
Xt, Xv, yt, yv = generate_data(n=n)
pool_size = min(mp.cpu_count(), len(param_list))
start = time()
p = mp.Pool(pool_size, initializer=init_pool)
ROC = p.starmap(train_model, zip(param_list, repeat(Xt), repeat(yt), repeat(Xv), repeat(yv)))
p.close()
p.join()
end = time()
print(f"Total running time for {len(param_list)} combinations: {round(end - start, 0)} seconds.")
print(f"Highest ROC AUC score: {np.max(ROC)}")
print(f"Matching parameters: {param_list[np.argmax(ROC)]}")
Running this on my personnal laptop outputs the following:
Total running time for 18 combinations: 24.0 seconds.
Highest ROC AUC score: 0.5079410814800223
Matching parameters: {'num_leaves': 30, 'learning_rate': 0.3, 'n_estimators': 50}
So my first question is:
- why won't it run on Azure Databricks?
Now, poking around a bit looking for alternatives, I was told about "resilient distributed datasets" or "rdd" and, after some effort, managed to have the following work:
from sklearn.model_selection import train_test_split
from itertools import product, repeat
import multiprocessing as mp
from sklearn import metrics
import lightgbm as lgb
from time import time
import pandas as pd
import numpy as np
def generate_data(n):
df = pd.DataFrame({
'x1': np.random.random(n) * 100,
'x2': np.random.choice(['a', 'b', 'c', 'd'], n, replace=True),
'x3': np.random.choice(['cow', 'platypus', 'koala', 'panda', 'camel'], n, replace=True),
'x4': np.random.poisson(15, n),
'y': np.random.choice([0, 1], n, replace=True, p=[0.8, 0.2])
})
# Necessary steps for lightgbm
for _ in df.columns:
if df[_].dtypes == 'object':
df[_] = df[_].astype('category')
X, y = df.drop(['y'], axis=1), df['y']
return train_test_split(X, y, test_size=0.3, stratify=y)
n = 100_000
Xt, Xv, yt, yv = generate_data(n=n)
def grid_to_list(grid):
'''Parameter grid is converted to a list of all combinations'''
keys, values = zip(*grid.items())
return [dict(zip(keys, v)) for v in product(*values)]
param_list = grid_to_list({
'num_leaves': [20, 30, 40],
'learning_rate': [0.1, 0.3],
'n_estimators': [50, 100, 250]
})
class HyperparameterOptimiser:
def __init__(self, params, Xt, yt, Xv, yv, train_fct):
self.param_list = params
self.Xt = Xt
self.yt = yt
self.Xv = Xv
self.yv = yv
self.train_fct = train_fct
def optimise(self, n_jobs=None):
if n_jobs is None:
n_jobs = min(len(self.param_list), 4 * 16) # Pourquoi 4 * 16?
start = time()
# <BEGIN ANNOYING SECTION>
train_fct = self.train_fct
Xt = self.Xt
yt = self.yt
Xv = self.Xv
yv = self.yv
rdd = sc.parallelize(self.param_list, n_jobs)
self.ROC = rdd.map(lambda p: train_fct(p, Xt, yt, Xv, yv)).collect()
# <END ANNOYING SECTION>
self.running_time = round(time() - start, 0)
self.output_results()
pass
def output_results(self):
print(f"Total running time for {len(self.param_list)} combinations: {self.running_time} seconds.")
print(f"Highest ROC AUC score: {max(self.ROC)}")
print(f"Matching parameters: {self.param_list[np.argmax(self.ROC)]}")
pass
def train_model(params, Xt, yt, Xv, yv):
model = lgb.LGBMClassifier(objective='binary', subsample=0.8, random_state=123, **params)
model.fit(Xt, yt)
predictions = model.predict_proba(Xv)[:, 1]
return metrics.roc_auc_score(yv, predictions)
# Note: very useful to be able to pass whatever "train function" is warranted with regard to context
ho = HyperparameterOptimiser(param_list, Xt, yt, Xv, yv, train_model)
ho.optimise()
In this case, the running time is the following:
Total running time for 18 combinations: 356.0 seconds.
Highest ROC AUC score: 0.5065868367986968
Matching parameters: {'num_leaves': 20, 'learning_rate': 0.3, 'n_estimators': 100}
This, however, raises more questions than answers:
- Why it is so much slower?
- Why must I pass every argument individually (please see the "ANNOYING SECTION" in the code comments), and not through the
self
object, as I would have with thestarmap
function in the first case?
I am guessing part of the answer to question no2 has to do with my choice of cluster, relative to the specs of my personnal computer. While I agree with that, the code is far from intensive, and I find it somewhat puzzling that the difference would amount to that big a number.
Hopefully, this will generate discussions that'll be helpful to others as well. Cheers.