I am running a python program with Keras (where I am performing grid search on a deep neural network) on a Spark cluster and I want to secure that my results are reproducible.
My Spark cluster is ignited by running the following .sh
script:
SPARK_HOME=/u/users/******/spark-2.3.0 \
Q_CORE_LOC=/u/users/******/q-core \
ENV=local \
HIVE_HOME=/usr/hdp/current/hive-client \
SPARK2_HOME=/u/users/******/spark-2.3.0 \
HADOOP_CONF_DIR=/etc/hadoop/conf \
HIVE_CONF_DIR=/etc/hive/conf \
HDFS_PREFIX=hdfs:// \
PYTHONPATH=/u/users/******/q-core/python-lib:/u/users/******/three-queues/python-lib:/u/users/******/pyenv/prod_python_libs/lib/python2.7/site-packages/:$PYTHON_PATH \
YARN_HOME=/usr/hdp/current/hadoop-yarn-client \
SPARK_DIST_CLASSPATH=$(hadoop classpath):$(yarn classpath):/etc/hive/conf/hive-site.xml \
PYSPARK_PYTHON=/usr/bin/python2.7 \
QQQ_LOC=/u/users/******/three-queues \
spark-submit \
--master yarn 'dnn_grid_search.py' \
--executor-memory 10g \
--num-executors 8 \
--executor-cores 10 \
--conf spark.port.maxRetries=80 \
--conf spark.dynamicAllocation.enabled=False \
--conf spark.default.parallelism=6000 \
--conf spark.sql.shuffle.partitions=6000 \
--principal ************************ \
--queue default \
--name lets_get_starting \
--keytab /u/users/******/.******.keytab \
--driver-memory 10g
By running nohup ./spark_python_shell.sh > output.log &
at my bash
shell I ignite the Spark cluster and I also get my python-Keras script running (see above spark-submit \ --master yarn 'dnn_grid_search.py'
).
To secure the reproducibility of results I tried to do what I successfully did on my laptop's CPU to get reproducible results (see also my answers on StackOverflow: answer_1, answer_2):
# Seed value
# Apparently you may use different seed values at each stage
seed_value= 0
# 1. Set `PYTHONHASHSEED` environment variable at a fixed value
import os
os.environ['PYTHONHASHSEED']=str(seed_value)
# 2. Set `python` built-in pseudo-random generator at a fixed value
import random
random.seed(seed_value)
# 3. Set `numpy` pseudo-random generator at a fixed value
import numpy as np
np.random.seed(seed_value)
# 4. Set `tensorflow` pseudo-random generator at a fixed value
import tensorflow as tf
tf.set_random_seed(seed_value)
# 5. Configure a new global `tensorflow` session
from keras import backend as K
session_conf = tf.ConfigProto(intra_op_parallelism_threads=1, inter_op_parallelism_threads=1)
sess = tf.Session(graph=tf.get_default_graph(), config=session_conf)
K.set_session(sess)
However, by including part (5)
then I am pretty sure that the program is not running properly because some print
statements which I have in it do not print anything in the output.log
file.
Specifically, I created a very simple custom scorer function to insert this print statement to it so that I can see if the grid search is really running:
def precision_recall(y_true, y_predict):
# Print time
from datetime import datetime
time_now = datetime.utcnow()
print('Grid Search time now:', time_now)
from sklearn.metrics import precision_recall_fscore_support
_, recall, _, _ = precision_recall_fscore_support(y_true, y_predict, average='binary')
return recall
from sklearn.metrics import make_scorer
custom_scorer = make_scorer(score_func=precision_recall, greater_is_better=True, needs_proba=False)
# Execute grid search
# Notice 'n_jobs=-1' for parallelizing the jobs (across the cluster)
from sklearn.model_selection import GridSearchCV
classifiers_grid = GridSearchCV(estimator=classifier, param_grid=parameters, scoring=custom_scorer, cv=5, n_jobs=-1)
classifiers_grid.fit(X, y)
But while the print statement within the custom score function (precision_recall
) was properly printing the time when I was running other ML algorithms (random forest etc) then when I tried that with Keras/Tensorflow with its seed system and session then nothing is printed.
Therefore, my question is how can I get reproducible results with Keras/Tensorflow on Spark?