47

I'm having a problem with using Python on Spark. My application has some dependencies, such as numpy, pandas, astropy, etc. I cannot use virtualenv to create an environment with all dependencies, since the nodes on the cluster do not have any common mountpoint or filesystem, besides HDFS. Therefore I am stuck with using spark-submit --py-files. I package the contents of site-packages in a ZIP file and submit the job like with --py-files=dependencies.zip option (as suggested in Easiest way to install Python dependencies on Spark executor nodes?). However, the nodes on cluster still do not seem to see the modules inside and they throw ImportError such as this when importing numpy.

File "/path/anonymized/module.py", line 6, in <module>
    import numpy
File "/tmp/pip-build-4fjFLQ/numpy/numpy/__init__.py", line 180, in <module>   
File "/tmp/pip-build-4fjFLQ/numpy/numpy/add_newdocs.py", line 13, in <module>
File "/tmp/pip-build-4fjFLQ/numpy/numpy/lib/__init__.py", line 8, in <module>
    #
File "/tmp/pip-build-4fjFLQ/numpy/numpy/lib/type_check.py", line 11, in <module>
File "/tmp/pip-build-4fjFLQ/numpy/numpy/core/__init__.py", line 14, in <module>
ImportError: cannot import name multiarray

When I switch to the virtualenv and use the local pyspark shell, everything works fine, so the dependencies are all there. Does anyone know, what might cause this problem and how to fix it?

Thanks!

Community
  • 1
  • 1
Andrej Palicka
  • 971
  • 1
  • 11
  • 26
  • 5
    Apparently, the problem lies in the fact, that Python cannot import .so modules from .zip files (https://docs.python.org/2/library/zipimport.html). This means I need to somehow unpack the zipfile on all the workers and then add the unpack location to the sys.path on all the workers. I'll try it out and see how it goes. – Andrej Palicka Apr 07 '16 at 14:25
  • I'm having a similar issue. How did you resolve the problem? – ramhiser Sep 28 '16 at 23:02
  • 1
    In the end I have convinced the administrators of the cluster to install the dependencies on the worker machines. Another solution would be to have a volume that is mounted on all the machines where the dependencies would reside. I don't know whether anything else can be done :-\ – Andrej Palicka Sep 29 '16 at 08:28
  • I finally got it to work. I'll post an answer below. – ramhiser Sep 29 '16 at 18:02

7 Answers7

94

First off, I'll assume that your dependencies are listed in requirements.txt. To package and zip the dependencies, run the following at the command line:

pip install -t dependencies -r requirements.txt
cd dependencies
zip -r ../dependencies.zip .

Above, the cd dependencies command is crucial to ensure that the modules are the in the top level of the zip file. Thanks to Dan Corin's post for heads up.

Next, submit the job via:

spark-submit --py-files dependencies.zip spark_job.py

The --py-files directive sends the zip file to the Spark workers but does not add it to the PYTHONPATH (source of confusion for me). To add the dependencies to the PYTHONPATH to fix the ImportError, add the following line to the Spark job, spark_job.py:

sc.addPyFile("dependencies.zip")

A caveat from this Cloudera post:

An assumption that anyone doing distributed computing with commodity hardware must assume is that the underlying hardware is potentially heterogeneous. A Python egg built on a client machine will be specific to the client’s CPU architecture because of the required C compilation. Distributing an egg for a complex, compiled package like NumPy, SciPy, or pandas is a brittle solution that is likely to fail on most clusters, at least eventually.

Although the solution above does not build an egg, the same guideline applies.

ramhiser
  • 3,342
  • 3
  • 23
  • 29
  • did it also work with dependencies that had native code and were provided as `.so` files? I remember that I had most problems with those. Anyhow, I'll try this once I get the chance and if it'll work I'll accept it. – Andrej Palicka Sep 30 '16 at 13:00
  • Good question. I did not have any issues with `.so` files, but that may be because my dependencies are built inside of a Docker container. My Spark workers are also running inside Docker containers via Kubernetes, which gives me some level of homogeneity. I've started a [Spark + Kubernetes project](https://github.com/ramhiser/spark-kubernetes) to standardize solutions to this and other issues I've encountered. – ramhiser Sep 30 '16 at 13:12
  • 3
    The cluster I was trying to run on was homogenous as far as I know. The main issue was that Python does not allow dynamic import of `.so` files, although it is possible that Spark has overcome that somehow. I'll try it out and get back to you. – Andrej Palicka Sep 30 '16 at 14:19
  • Excellent. I'd like to know if it work. I know that EMR on AWS is a typical deployment model for Spark, but I'm finding certain issues are a bit easier to handle via Docker + Kubernetes because of the (nearly) guaranteed homogeneity. – ramhiser Sep 30 '16 at 18:29
  • I tried these instructions in a clean Docker environment and they didn't resolve the issue. – Jussi Kujala Sep 09 '17 at 09:11
  • 3
    Has anyone been able in the meantime to find a resolution to import dependencies that have .so files in Spark? Thanks – pippobaudos Jul 13 '18 at 17:55
  • Does it work if you use virtualenv like I described in my detailed example here: https://stackoverflow.com/questions/48770263/bundling-python3-packages-for-pyspark-results-in-missing-imports/53623903#53623903 ? – devinbost Dec 05 '18 at 01:36
  • If I pass two zip files I am getting an error like `Exception in thread "main" org.apache.spark.SparkException: Cannot load main class from JAR file:/Users/fra/Documents/testSmartMobility/AnalysisResult/spark_dist/test_spark_submit-0.1-deps.zip at org.apache.spark.deploy.SparkSubmitArguments.error(SparkSubmitArguments.scala:657) at org.apache.spark.deploy.SparkSubmitArguments.loadEnvironmentArguments(SparkSubmitArguments.scala:221) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)` has anyone experienced the same – roschach Dec 26 '18 at 21:47
  • 14
    The fact that `--py-files` does not add it to `PYTHONPATH` is not just a source of confusion for you --- it seems to be a general problem! – vy32 Jan 25 '19 at 19:06
  • 2
    Of all the possibile solutions I found online for this problem, this is the best. And still, It feels like an hack. They should really release an option similar to `--packages` also for python dependecies! – ciurlaro Feb 09 '21 at 12:31
21
  • First you need to pass your files through --py-files or --files

    • When you pass your zip/files with the above flags, basically your resources will be transferred to temporary directory created on HDFS just for the lifetime of that application.
  • Now in your code, add those zip/files by using the following command

    sc.addPyFile("your zip/file")

    • what the above does is, it loads the files to the execution environment, like JVM.
  • Now import your zip/file in your code with an alias like the following to start referencing it

    import zip/file as your-alias

    Note: You need not use file extension while importing, like .py at the end

Hope this is useful.

avrsanjay
  • 805
  • 7
  • 12
  • I always get ```ImportError: No module named zipfile```. Any suggestions? – Ivan Bilan Mar 06 '18 at 14:43
  • @ivan_bilan you have to replace zipfile with the name of your zipfile. – avrsanjay Mar 09 '18 at 18:17
  • 1
    yea, I did that. I wasn't able to do this using `sc.AddPyFile`, instead I loaded the whole venv using --archives. More details here: https://stackoverflow.com/questions/49132151/elephas-not-loaded-in-pyspark-no-module-named-elephas-spark-model – Ivan Bilan Mar 11 '18 at 22:10
  • 5
    I am writing this maybe it is too late but might help someone. that module (folder) which you are trying to import must have a blank file with name `__init__.py` else this setup will not work. everything else in this answer is as perfect as its number of upvotes. – desaiankitb Jul 18 '18 at 10:43
12

To get this dependency distribution approach to work with compiled extensions we need to do two things:

  1. Run the pip install on the same OS as your target cluster (preferably on the master node of the cluster). This ensures compatible binaries are included in your zip.
  2. Unzip your archive on the destination node. This is necessary since Python will not import compiled extensions from zip files. (https://docs.python.org/3.8/library/zipimport.html)

Using the following script to create your dependencies zip will ensure that you are isolated from any packages already installed on your system. This assumes virtualenv is installed and requirements.txt is present in your current directory, and outputs a dependencies.zip with all your dependencies at the root level.

env_name=temp_env

# create the virtual env
virtualenv --python=$(which python3) --clear /tmp/${env_name}

# activate the virtual env
source /tmp/${env_name}/bin/activate

# download and install dependencies
pip install -r requirements.txt

# package the dependencies in dependencies.zip. the cd magic works around the fact that you can't specify a base dir to zip
(cd /tmp/${env_name}/lib/python*/site-packages/ && zip -r - *) > dependencies.zip

The dependencies can now be deployed, unzipped, and included in the PYTHONPATH as so

spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --conf 'spark.yarn.dist.archives=dependencies.zip#deps' \
  --conf 'spark.yarn.appMasterEnv.PYTHONPATH=deps' \
  --conf 'spark.executorEnv.PYTHONPATH=deps' \
.
.
.

spark.yarn.dist.archives=dependencies.zip#deps
distributes your zip file and unzips it to a directory called deps

spark.yarn.appMasterEnv.PYTHONPATH=deps
spark.executorEnv.PYTHONPATH=deps
includes the deps directory in the PYTHONPATH for the master and all workers

--deploy-mode cluster
runs the master executor on the cluster so it picks up the dependencies

Dima
  • 2,601
  • 1
  • 18
  • 9
  • While not totally automated and probably not usable in an environment you don't manage yourself (such as a 3rd party cluster, which, if I recall correctly, was my original use-case) I think this answer is as good as it gets with dependency management of pyspark applications :-) Thanks! – Andrej Palicka Sep 22 '20 at 09:14
  • @AndrejPalicka - thanks for accepting even though I am 4 years too late :D – Dima Sep 22 '20 at 19:20
  • If you don't manage the cluster (and you can't ssh to the master node) you'd have to run the `pip install` on another machine with the same OS as your cluster to get the right binaries. Then upload the zip to S3 or another filesystem your cluster can talk to. Then use `spark.yarn.dist.archives=s3:///dependencies.zip#deps` to download your dependencies and unzip. – Dima Sep 22 '20 at 19:30
1

Try to use --archives to archive your anaconda dir to each server and use --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON= to tell your spark server where is your python executor path in your anaconda dir.

Our full config is this:

--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./ANACONDA/anaconda-dependencies/bin/python 

--archives <S3-path>/anaconda-dependencies.zip#ANACONDA
Jin Lee
  • 3,194
  • 12
  • 46
  • 86
Graham
  • 46
  • 1
0

You can locate all the .pys you need and add them relatively. see here for this explanation:

import os, sys, inspect
 # realpath() will make your script run, even if you symlink it :)
 cmd_folder = os.path.realpath(os.path.abspath(os.path.split(inspect.getfile( inspect.currentframe() ))[0]))
 if cmd_folder not in sys.path:
     sys.path.insert(0, cmd_folder)

 # use this if you want to include modules from a subfolder
 cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"subfolder")))
 if cmd_subfolder not in sys.path:
     sys.path.insert(0, cmd_subfolder)

 # Info:
 # cmd_folder = os.path.dirname(os.path.abspath(__file__)) # DO NOT USE __file__ !!!
 # __file__ fails if script is called in different ways on Windows
 # __file__ fails if someone does os.chdir() before
 # sys.argv[0] also fails because it doesn't not always contains the path
Community
  • 1
  • 1
kpie
  • 9,588
  • 5
  • 28
  • 50
  • 1
    I do not quite understand how this might help me with Spark and getting the dependencies to the worker nodes. Could you elaborate a bit more please? Thanks! – Andrej Palicka Apr 06 '16 at 20:18
  • If you download and add numpy to your project you can add the containing folder to the sys path and import it from there. It may not be the optimal solution considering the redundancy when you know that you have numpy on the nodes already but it provides that the nodes won't need to have it pre-installed. – kpie Apr 06 '16 at 21:35
0

Spark will also silently fail to load a zip archive that is created with the python zipfile module. Zip archives must be created using a zip utility.

Nathan Buesgens
  • 1,415
  • 1
  • 15
  • 29
  • 1
    What is the problem introduced by zipfile? What is the right way to create the zip? – blue Aug 22 '17 at 16:48
0

As Andrej Palicka expained in the comments,

"the problem lies in the fact, that Python cannot import .so modules from .zip files (docs.python.org/2/library/zipimport.html)".

A solution that I found is to add the non .py files one by one to py-files separated by comas:

spark-submit --py-files modules/toolbox.cpython-38-x86_64-linux-gnu.so,modules/product.cpython-38-x86_64-linux-gnu.so spark_fro

m_cython.py

godot
  • 1,550
  • 16
  • 33