2

I am trying to distribute Keras training on a cluster and use Elephas for that. But, when running the basic example from the doc of Elephas (https://github.com/maxpumperla/elephas):

from elephas.utils.rdd_utils import to_simple_rdd
rdd = to_simple_rdd(sc, x_train, y_train)
from elephas.spark_model import SparkModel
from elephas import optimizers as elephas_optimizers
sgd = elephas_optimizers.SGD()
spark_model = SparkModel(sc, model, optimizer=sgd, frequency='epoch', mode='asynchronous', num_workers=2)
spark_model.train(rdd, nb_epoch=epochs, batch_size=batch_size, verbose=1, validation_split=0.1)

I get the following error:

 ImportError: No module named elephas.spark_model



```Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5.0 failed 4 times, most recent failure: Lost task 1.3 in stage 5.0 (TID 58, xxxx, executor 8): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/xx/xx/hadoop/yarn/local/usercache/xx/appcache/application_151xxx857247_19188/container_1512xxx247_19188_01_000009/pyspark.zip/pyspark/worker.py", line 163, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/xx/xx/hadoop/yarn/local/usercache/xx/appcache/application_151xxx857247_19188/container_1512xxx247_19188_01_000009/pyspark.zip/pyspark/worker.py", line 54, in read_command
    command = serializer._read_with_length(file)
  File /yarn/local/usercache/xx/appcache/application_151xxx857247_19188/container_1512xxx247_19188_01_000009/pyspark.zip/pyspark/serializers.py", line 169, in _read_with_length
    return self.loads(obj)
  File "/yarn//local/usercache/xx/appcache/application_151xxx857247_19188/container_1512xxx247_19188_01_000009/pyspark.zip/pyspark/serializers.py", line 454, in loads
    return pickle.loads(obj)
ImportError: No module named elephas.spark_model

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)```

Also, the model is actually created, I can do print(spark_model) and will get this <elephas.spark_model.SparkModel object at 0x7efce0abfcd0>. The error occurs during spark_model.train.

I've installed elephas using pip2 install git+https://github.com/maxpumperla/elephas, maybe this is relevant.

I use PySpark 2.1.1, Keras 2.1.4 and Python 2.7. I've tried running it with spark-submit:

PYSPARK_DRIVER_PYTHON=`which python` spark-submit --driver-memory 1G  filname.py

And also directly in a Jupyter Notebook. Both result in the same problem.

Can anyone give me any pointers? Is this elephas related or is it a PySpark problem?

EDIT: I also upload the zip file of the virtual environment and call it within the script:

virtualenv spark_venv --relocatable
cd spark_venv 
zip -qr ../spark_venv.zip *

PYSPARK_DRIVER_PYTHON=`which python` spark-submit --driver-memory 1G --py-files spark_venv.zip filename.py

Then in the file I do:

sc.addPyFile("spark_venv.zip")

After this keras is imported without any problems, but I still get the elephas error from above.

Ivan Bilan
  • 2,379
  • 5
  • 38
  • 58

2 Answers2

3

I found a solution on how to properly load a virtual environment to the master and all the slave workers:

virtualenv venv --relocatable
cd venv 
zip -qr ../venv.zip *

PYSPARK_PYTHON=./SP/bin/python spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./SP/bin/python --driver-memory 4G --archives venv.zip#SP filename.py

More details in the GitHub Issue: https://github.com/maxpumperla/elephas/issues/80#issuecomment-371073492

Ivan Bilan
  • 2,379
  • 5
  • 38
  • 58
1

You should add elephas library as an argument to your spark-submit command.

Citing official guide:

For Python, you can use the --py-files argument of spark-submit to add .py, .zip or .egg files to be distributed with your application. If you depend on multiple Python files we recommend packaging them into a .zip or .egg.

Official guide

addmeaning
  • 1,358
  • 1
  • 13
  • 36
  • I zipped the env I use with elephas installed and included it with `--py-files venv.zip`, afterwards I loaded it into the `sc` with `sc.addPyFile("venv.zip")`. This didn't do anything, am I missing a step here? I've also found example that tell you to `import` the venv after loading it into the `sc`, but doing `import venv` doesn't work and tells me that `venv` doesn't exist. – Ivan Bilan Mar 06 '18 at 14:29
  • The funny thing is, everything else seems to be loading just fine, things like Keras and other dependencies import just fine when I include the `venv.zip`, but I still get the `elephas` error I described above. – Ivan Bilan Mar 06 '18 at 14:54
  • @ivan_bilan does it seems like venv.zip contain required elephas files? Can you manually inspect the archive – addmeaning Mar 06 '18 at 19:53