2

I am trying to execute a Grid Search on a Spark cluster with the spark-sklearn library. For this reason, I am running nohup ./spark_python_shell.sh > output.log & at my bash shell to ignite the Spark cluster and I also get my python script running (see below spark-submit \ --master yarn 'rforest_grid_search.py'):

    SPARK_HOME=/u/users/******/spark-2.3.0 \
    Q_CORE_LOC=/u/users/******/****** \
    ENV=local \
    HIVE_HOME=/usr/hdp/current/hive-client \
    SPARK2_HOME=/u/users/******/spark-2.3.0 \
    HADOOP_CONF_DIR=/etc/hadoop/conf \
    HIVE_CONF_DIR=/etc/hive/conf \
    HDFS_PREFIX=hdfs:// \
    PYTHONPATH=/u/users/******/******/python-lib:/u/users/******/******/python-lib:/u/users/******/pyenv/prod_python_libs/lib/python2.7/site-packages/:$PYTHON_PATH \
    YARN_HOME=/usr/hdp/current/hadoop-yarn-client \
    SPARK_DIST_CLASSPATH=$(hadoop classpath):$(yarn classpath):/etc/hive/conf/hive-site.xml \
    PYSPARK_PYTHON=/usr/bin/python2.7 \
    QQQ_LOC=/u/users/******/three-queues \
    spark-submit \
    --master yarn 'rforest_grid_search.py' \
    --executor-memory 10g \
    --num-executors 8 \
    --executor-cores 10 \
    --conf spark.port.maxRetries=80 \
    --conf spark.dynamicAllocation.enabled=False \
    --conf spark.default.parallelism=6000 \
    --conf spark.sql.shuffle.partitions=6000 \
    --principal ************************ \
    --queue default \
    --name lets_get_starting \
    --keytab /u/users/******/.******.keytab \
    --driver-memory 10g

In this rforest_grid_search.py python script there is the following source code which tries to connect the Grid Search with the Spark cluster:

# Spark configuration
from pyspark import SparkContext, SparkConf
conf = SparkConf()
sc = SparkContext(conf=conf)
print('Spark Context:', sc)

# Hyperparameters' grid
parameters = {'n_estimators': list(range(150, 200, 25)), 'criterion': ['gini', 'entropy'], 'max_depth': list(range(2, 11, 2)), 'max_features': [i/10. for i in range(10, 16)], 'class_weight': [{0: 1, 1: i/10.} for i in range(10, 17)], 'min_samples_split': list(range(2, 7))}

# Execute grid search - using spark_sklearn library
from spark_sklearn import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
classifiers_grid = GridSearchCV(sc, estimator=RandomForestClassifier(), param_grid=parameters, scoring='precision', cv=5, n_jobs=-1)
classifiers_grid.fit(X, y)

When I am running the python script then I am getting an error at the line classifiers_grid.fit(X, y) which is the following:

ImportError: No module named model_selection._validation

or in quite greater detail (but without including everything because it is too long) is the following:

...
    ('Spark Context:', <SparkContext master=yarn appName=rforest_grid_search.py>)
...
    18/10/24 12:43:50 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, oser404637.*****.com, executor 2, partition 2, PROCESS_LOCAL, 42500 bytes)
    18/10/24 12:43:50 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, oser404637.*****.com, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/u/applic/data/hdfs2/hadoop/yarn/local/usercache/*****/appcache/application_1539785180345_36939/container_e126_1539785180345_36939_01_000003/pyspark.zip/pyspark/worker.py", line 216, in main
        func, profiler, deserializer, serializer = read_command(pickleSer, infile)
      File "/u/applic/data/hdfs2/hadoop/yarn/local/usercache/*****/appcache/application_1539785180345_36939/container_e126_1539785180345_36939_01_000003/pyspark.zip/pyspark/worker.py", line 58, in read_command
        command = serializer._read_with_length(file)
      File "/u/applic/data/hdfs2/hadoop/yarn/local/usercache/*****/appcache/application_1539785180345_36939/container_e126_1539785180345_36939_01_000003/pyspark.zip/pyspark/serializers.py", line 170, in _read_with_length
        return self.loads(obj)
      File "/u/applic/data/hdfs2/hadoop/yarn/local/usercache/*****/appcache/application_1539785180345_36939/container_e126_1539785180345_36939_01_000003/pyspark.zip/pyspark/serializers.py", line 562, in loads
        return pickle.loads(obj)
    ImportError: No module named model_selection._validation

            at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
            at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
            at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
...

When I was running the same python script but a little bit modified (in terms of the cross validation) then I was getting the following error:

Traceback (most recent call last):
  File "/data/users/******/rforest_grid_search.py", line 126, in <module>
    classifiers_grid.fit(X, y)
  File "/usr/lib/python2.7/site-packages/spark_sklearn/grid_search.py", line 274, in fit
    return self._fit(X, y, groups, ParameterGrid(self.param_grid))
  File "/usr/lib/python2.7/site-packages/spark_sklearn/grid_search.py", line 321, in _fit
    indexed_out0 = dict(par_param_grid.map(fun).collect())
  File "/u/users/******/spark-2.3.0/python/lib/pyspark.zip/pyspark/rdd.py", line 824, in collect
  File "/u/users/******/spark-2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/u/users/******/spark-2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 7, oser402389.wal-mart.com, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/u/applic/data/hdfs1/hadoop/yarn/local/usercache/******/appcache/application_1539785180345_42235/container_e126_1539785180345_42235_01_000002/pyspark.zip/pyspark/worker.py", line 216, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/u/applic/data/hdfs1/hadoop/yarn/local/usercache/******/appcache/application_1539785180345_42235/container_e126_1539785180345_42235_01_000002/pyspark.zip/pyspark/worker.py", line 58, in read_command
    command = serializer._read_with_length(file)
  File "/u/applic/data/hdfs1/hadoop/yarn/local/usercache/******/appcache/application_1539785180345_42235/container_e126_1539785180345_42235_01_000002/pyspark.zip/pyspark/serializers.py", line 170, in _read_with_length
    return self.loads(obj)
  File "/u/applic/data/hdfs1/hadoop/yarn/local/usercache/******/appcache/application_1539785180345_42235/container_e126_1539785180345_42235_01_000002/pyspark.zip/pyspark/serializers.py", line 562, in loads
    return pickle.loads(obj)
ImportError: No module named sklearn.base

How can I fix this and perform GridSearchCV on the Spark cluster?

Does this error simply mean that scikit-learn and/or spark-sklearn are not installed on the Spark worker nodes (even though apparently they are installed on the Spark edge/driver node which I am using to connect to the Spark cluster)?

Outcast
  • 4,967
  • 5
  • 44
  • 99

1 Answers1

1

Does this error simply mean that scikit-learn and/or spark-sklearn is not installed on the Spark worker nodes

Yes, it suggests exactly that, or to be more precise, that the modules are not present on the path of the Python interpreter used by your Spark workers.

In general case all modules which are used by worker-side code have to be accessible on each node. There are different options, depending on the complexity of the dependencies

  • Install all the dependencies on each or in the container (if used). Often prefer, as there is no runtime overhead, and one use optimized native libraries if applicable (crucial for high performance machine learning).
  • Use pyfiles options to distribute packages (usually eggs) along the task. Good for simple, plain Python dependencies which don't require compiling, and have no native dependencies.
  • Distributing complete virtual environments (like conda) with local dependencies. Can work in simple cases, but has high overhead (large archives distributed with each task), won't work on mixed architecture cluster, and uses non-optimized native dependencies.
  • Installing Python dependencies (if native ones are present), from within the task - Numpy and static linking
  • Hm, this answer looks pretty interesting(upvote). However, it is a bit high-level and I am not a data engineer to translate it to low level actions. However, for start we may say that we have identified why I was getting this error even though I am not sure how can I fix it by myself. – Outcast Oct 25 '18 at 12:49