3

I am running a python program with Keras (where I am performing grid search on a deep neural network) on a Spark cluster and I want to secure that my results are reproducible.

My Spark cluster is ignited by running the following .sh script:

SPARK_HOME=/u/users/******/spark-2.3.0 \
Q_CORE_LOC=/u/users/******/q-core \
ENV=local \
HIVE_HOME=/usr/hdp/current/hive-client \
SPARK2_HOME=/u/users/******/spark-2.3.0 \
HADOOP_CONF_DIR=/etc/hadoop/conf \
HIVE_CONF_DIR=/etc/hive/conf \
HDFS_PREFIX=hdfs:// \
PYTHONPATH=/u/users/******/q-core/python-lib:/u/users/******/three-queues/python-lib:/u/users/******/pyenv/prod_python_libs/lib/python2.7/site-packages/:$PYTHON_PATH \
YARN_HOME=/usr/hdp/current/hadoop-yarn-client \
SPARK_DIST_CLASSPATH=$(hadoop classpath):$(yarn classpath):/etc/hive/conf/hive-site.xml \
PYSPARK_PYTHON=/usr/bin/python2.7 \
QQQ_LOC=/u/users/******/three-queues \
spark-submit \
--master yarn 'dnn_grid_search.py' \
--executor-memory 10g \
--num-executors 8 \
--executor-cores 10 \
--conf spark.port.maxRetries=80 \
--conf spark.dynamicAllocation.enabled=False \
--conf spark.default.parallelism=6000 \
--conf spark.sql.shuffle.partitions=6000 \
--principal ************************ \
--queue default \
--name lets_get_starting \
--keytab /u/users/******/.******.keytab \
--driver-memory 10g

By running nohup ./spark_python_shell.sh > output.log & at my bash shell I ignite the Spark cluster and I also get my python-Keras script running (see above spark-submit \ --master yarn 'dnn_grid_search.py').

To secure the reproducibility of results I tried to do what I successfully did on my laptop's CPU to get reproducible results (see also my answers on StackOverflow: answer_1, answer_2):

# Seed value
# Apparently you may use different seed values at each stage
seed_value= 0

# 1. Set `PYTHONHASHSEED` environment variable at a fixed value
import os
os.environ['PYTHONHASHSEED']=str(seed_value)

# 2. Set `python` built-in pseudo-random generator at a fixed value
import random
random.seed(seed_value)

# 3. Set `numpy` pseudo-random generator at a fixed value
import numpy as np
np.random.seed(seed_value)

# 4. Set `tensorflow` pseudo-random generator at a fixed value
import tensorflow as tf
tf.set_random_seed(seed_value)

# 5. Configure a new global `tensorflow` session
from keras import backend as K
session_conf = tf.ConfigProto(intra_op_parallelism_threads=1, inter_op_parallelism_threads=1)
sess = tf.Session(graph=tf.get_default_graph(), config=session_conf)
K.set_session(sess)

However, by including part (5) then I am pretty sure that the program is not running properly because some print statements which I have in it do not print anything in the output.log file.

Specifically, I created a very simple custom scorer function to insert this print statement to it so that I can see if the grid search is really running:

def precision_recall(y_true, y_predict):

    # Print time
    from datetime import datetime
    time_now = datetime.utcnow()
    print('Grid Search time now:', time_now)

    from sklearn.metrics import precision_recall_fscore_support
    _, recall, _, _ = precision_recall_fscore_support(y_true, y_predict, average='binary')

    return  recall

from sklearn.metrics import make_scorer
custom_scorer = make_scorer(score_func=precision_recall, greater_is_better=True, needs_proba=False)

# Execute grid search
# Notice 'n_jobs=-1' for parallelizing the jobs (across the cluster)
from sklearn.model_selection import GridSearchCV
classifiers_grid = GridSearchCV(estimator=classifier, param_grid=parameters, scoring=custom_scorer, cv=5, n_jobs=-1)
classifiers_grid.fit(X, y)

But while the print statement within the custom score function (precision_recall) was properly printing the time when I was running other ML algorithms (random forest etc) then when I tried that with Keras/Tensorflow with its seed system and session then nothing is printed.

Therefore, my question is how can I get reproducible results with Keras/Tensorflow on Spark?

Outcast
  • 4,967
  • 5
  • 44
  • 99
  • As far as I can tell this code doesn't utilize Spark's executors, only the driver. – zero323 Oct 22 '18 at 15:59
  • @user6910411, I am not so experienced in Spark to understand what this really implies. Just to mention two things. Firstly, I firstly use SuperPutty to connect to an edgenode and then in bash within this user I am running the `nohup` command. Secondly, for example, when doing grid search with Xgboost I could get results within a couple of hours for 50000 different grid search combinations. Therefore, I have the impression that I do use the executors but I maybe wrong. – Outcast Oct 22 '18 at 16:09
  • What you've shared is a plain, non-distributed code. Even if you submit it with `spark-submit` it will stay exactly this - there is no Spark magic that could turn it into a distributed program. It is of course possible that your question is missing some important bits, and you actually use Spark distributed primitives to run individual components - if that's the case please [edit] your question and include this so the prospect answers have a full picture. – zero323 Oct 22 '18 at 16:20
  • Each of the libraries you seem to use have some form of Spark bindings (tensorframes, xgboost4j-spark, spark-deep-learning, spark-sklearn and more), but once again, nothing in the code you've shared so far suggests you use any of these. – zero323 Oct 22 '18 at 16:21
  • Finally I would strongly suggest focusing on specific scenario - handling RNG state in distributed application (if it is really distributed) can be quite complex, especially in the [guest languages](https://stackoverflow.com/q/31900124), and on top of that there are other sources of non-determinism which significantly limit your ability to provide fully reproducible results. – zero323 Oct 22 '18 at 16:26
  • @user6910411, Yes, that's true, I am not using any of the Spark binding however it is quite difficult for me to believe that I am not automatically using the Spark executors (by setting `n_jobs=-1`) when as I said I am doing grid search with Xgboost I could get results within a couple of hours for 50000 (!!) different grid search combinations. – Outcast Oct 22 '18 at 16:32
  • @user6910411, Hence, personally, I am converging more in that "It is of course possible that your question is missing some important bits, and you actually use Spark distributed primitives to run individual components" but I do not really know which are these because someone else has configured under the hood the whole Spark cluster etc. – Outcast Oct 22 '18 at 16:36
  • `Xgboost` is very fast and it can utilize multiple cores on your driver node. It doesn't however mean, that it is using Spark cluster. In fact, if you don't use `XGBoostEstimator` and the code is structure as the snippets shown above, we can be sure that it doesn't. – zero323 Oct 22 '18 at 16:57
  • @user6910411, ok, but the same happened with random forest and with some neural networks which I run. And how many are the driver's cores (I cannot see them on my `.sh` script) that I can manage to run such a deep grid search within a so short amount of time? – Outcast Oct 22 '18 at 17:03
  • Each snippet you've included can utilize at least multiple cores, and if available GPUs so it should easily utilize resources of the driver node. However discussion like this seems to be somewhat unrelated to the question raised here. Finally it is a bit futile without a [mcve] IMHO. – zero323 Oct 22 '18 at 17:11

0 Answers0