3

testing existing code with python3.6 but some how the udf which used to work with python 2.7 is not working as is, couldn't figure it out where the issue is. Anyone facing similar issue locally or distributed way? similar to https://github.com/mlflow/mlflow/issues/797

Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 202, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):+details
Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 202, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 219, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 139, in read_udfs
    arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type)
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 119, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 59, in read_command
    command = serializer._read_with_length(file)
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 170, in _read_with_length
    return self.loads(obj)
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 559, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'project'

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:83)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:66)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:213)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIteratorToStream(PythonUDFRunner.scala:52)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1992)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)

Driver stacktrace:
Avinash.Y
  • 77
  • 1
  • 6

2 Answers2

3
1. My project has sub packages and then a sub package
pkg
   subpckg1
          subpkg2
                .py
2. from my Main.py im calling a UDF which will be calling a function in subpkg2(.py) file
3 .due to more nesting functions and inter communication UDF's with lot other functions some how spark job couldn't find the subpkg2 files

solution :
create a egg file of the pkg and send via --py-files.
Avinash.Y
  • 77
  • 1
  • 6
1

I had a similar situation and @Avinash's answer worked for me. If the sub package is nested below other packages and the sub package is being referenced in the code directly, I had to create a separate zip file for the sub package module (subpkg2 in this case) and add it as using addPyFile to Spark Context.

scripts
|__ analysis.py
pkg
|__ __init__.py
|__ subpkg1
    |__ __init__.py
    |__ subpkg2
        | __init__.py
        |__ file1.py
#########################
## scripts/analysis.py ##
#########################

# Add pkg to path
path = os.path.join(os.path.dirname(__file__), os.pardir)
sys.path.append(path)

# Sub package referenced directly
from subpkg2 import file1

...
...

spark = (
    SparkSession
    .builder()
    .master("local[*]")
    .appname("some app")
    .getOrCreate()
)
# Need to add this, else references to sub package when using UDF do not work
spark.sparkContext.addPyFile(subpkg2.zip)

...
...

# Some code here that uses Pandas UDF with PySpark


I also noticed that in Cloudera Data Science Workbench (I am not sure if it is a generic finding or specific to CDSW), if subpkg2 is at the root level (i.e. is it a package and not a sub package - not nested within pkg and subpkg1), then I do not have to zip up subpkg2 and the UDF is able to recognize all the custom modules directly. I am not sure why this is the case. I am still looking for an answer to this question

scripts
|__ analysis.py
subpkg2
|__ __init__.py
|__ file1.py
#########################
## scripts/analysis.py ##
#########################

# Everything is same as the original example, except that there is 
# no need to specify this line. For some reason, UDF's recognize module
# references at the top level but not submodule references.

# spark.sparkContext.addPyFile(subpkg.zip)

This brings me to the final debug that I tried on the original example. If we change the references in the file to start with pkg.subpkg1 then we don't have to pass the subpkg.zip to Spark Context.

#########################
## scripts/analysis.py ##
#########################

# Add pkg to path
path = os.path.join(os.path.dirname(__file__), os.pardir)
sys.path.append(path)

# Specify full path here
from pkg.subpkg1.subpkg2 import file1

...
...

spark = (
    SparkSession
    .builder()
    .master("local[*]")
    .appname("some app")
    .getOrCreate()
)
# Don't need to add the zip file anymore since we changed the imports to use the full path
# spark.sparkContext.addPyFile(subpkg2.zip)

...
...

# Some code here that uses Pandas UDF with PySpark


Nikhil Gupta
  • 1,436
  • 12
  • 15