0

I'm trying to run ML trials in parallel using HyperOpt with SparkTrials on Databricks.

My opjective function converts the outputs to a spark dataframe using spark.createDataFrame(results) (to reuse some preprocessing code I've previously created - I'd prefer not to have to rewrite this).

However, this causes an error when attempting to use HyperOpt and SparkTrials, as the SparkContext used to create the dataframe "should only be created or accessed on the driver". Is there any way I can create a sparkDataFrame in my objective function here?

For a reproducible example:

from sklearn.datasets import load_iris
from sklearn.model_selection import cross_val_score
from sklearn.svm import SVC

from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK, Trials

from pyspark.sql import SparkSession

# If you are running Databricks Runtime for Machine Learning, `mlflow` is already installed and you can skip the following line. 
import mlflow

# Load the iris dataset from scikit-learn
iris = iris = load_iris()
X = iris.data
y = iris.target

def objective(C):
    # Create a support vector classifier model
    clf = SVC(C)
    # THESE TWO LINES CAUSE THE PROBLEM
    ss = SparkSession.builder.getOrCreate()
    sdf = ss.createDataFrame([('Alice', 1)])

    # Use the cross-validation accuracy to compare the models' performance
    accuracy = cross_val_score(clf, X, y).mean()
    
    # Hyperopt tries to minimize the objective function. A higher accuracy value means a better model, so you must return the negative accuracy.
    return {'loss': -accuracy, 'status': STATUS_OK}

search_space = hp.lognormal('C', 0, 1.0)
algo=tpe.suggest

# THIS WORKS (It's not using SparkTrials)
argmin = fmin(
  fn=objective,
  space=search_space,
  algo=algo,
  max_evals=16)

from hyperopt import SparkTrials

spark_trials = SparkTrials()

# THIS FAILS
argmin = fmin(
  fn=objective,
  space=search_space,
  algo=algo,
  max_evals=16,
  trials=spark_trials)

I have tried looking at this, but it is solving a different problem - I can't see an obvious way to apply it to my situation. How can I get the current SparkSession in any place of the codes?

s_pike
  • 1,710
  • 1
  • 10
  • 22

1 Answers1

0

I think the short answer is that it's not possible. The spark context can only exist on the driver node. Creating a new instance would be a kind of nesting, see this related question.

Nesting parallelizations in Spark? What's the right approach?

I solved my problem in the end by rewriting the transformations in pandas, which would then work.

If the transformations are too big for a single node then you'd probably have to pre-compute them and let hyperopt choose which version as part of the optimisation.

s_pike
  • 1,710
  • 1
  • 10
  • 22