9

I have the following folder structure

 - libfolder
    - lib1.py
    - lib2.py
 - main.py

main.py calls libfolder.lib1.py which then calls libfolder.lib2.py and others.

It all works perfectly fine in local machine but after I deploy it to Dataproc I get the following error

File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 455, in loads
return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'libfolder'

I have zipped the folder into xyz.zip and run the following command:

spark-submit --py-files=xyz.zip main.py

The serializer is not able to find the location for libfolder . Is there a problem with the way i am packaging my folders?

This issue is similar to this one but it's not answered.

Edit: response to Igor's questions

unzip -l for the zip file returns the following

 - libfolder
    - lib1.py
    - lib2.py
 - main.py

In main.py lib1.py is called with this import statement

from libfolder import lib1
Golak Sarangi
  • 809
  • 7
  • 22

3 Answers3

2

This worked for me:

$ cat main.py

from pyspark import SparkContext, SparkConf

from subpkg import sub

conf = SparkConf().setAppName("Shell Count")
sc = SparkContext(conf = conf)

text_file = sc.textFile("file:///etc/passwd")
counts = text_file.map(lambda line: sub.map(line)) \
    .map(lambda shell: (shell, 1)) \
    .reduceByKey(lambda a, b: sub.reduce(a, b))

counts.saveAsTextFile("hdfs:///count5.txt")

$ cat subpkg/sub.py

def map(line):
  return line.split(":")[6]

def reduce(a, b):
  return a + b

$ unzip -l /tmp/deps.zip 
Archive:  /tmp/deps.zip
  Length      Date    Time    Name
---------  ---------- -----   ----
        0  2019-01-07 14:22   subpkg/
        0  2019-01-07 13:51   subpkg/__init__.py
       79  2019-01-07 14:13   subpkg/sub.py
---------                     -------
       79                     3 files


$ gcloud dataproc jobs submit pyspark --cluster test-cluster main.py --py-files deps.zip
Job [1f0f15108a4149c5942f49513ce04440] submitted.
Waiting for job output...
Hello world!
Job [1f0f15108a4149c5942f49513ce04440] finished successfully.
tix
  • 2,138
  • 11
  • 18
  • It works fine for noop scripts. I get the above error only if I use spark's reduce method – Golak Sarangi Jan 02 '19 at 10:55
  • Hmm, the trivial word-count program also works for me if I pull out map/reduce functions into another module. Could you share a code snippet? – tix Jan 07 '19 at 22:25
1

You don't have to pass zip file using --py-files parameter in spark-submit. You just need to import the package where UDF function resides in your python project For example:

from package1.subpackage1.UDFPythonFile1 import UDF1

Package1 is directory where you are executing your entry point python script (main.py)

  • main.py
  • package1
    • subpackage1
      • UDFPythonFile1.py

Advantage of implementing above approach is to avoid creating zip file every time. This has worked for me on Windows machine.

0

For me works writing the py-files in quote.

spark2-submit --master yarn --verbose --deploy-mode client \
  --py-files "dependencies.zip" $HOME/jobs/master/etl_master.py

Also remember the order of the arguments, I would recommend first to add --verbose after that submit again the job and look at the log to find the arguments your program is passing thought the server.

Igor Dvorzhak
  • 4,360
  • 3
  • 17
  • 31
Antonio Cachuan
  • 475
  • 1
  • 9
  • 22