1

Running into the following error when use custom UDF

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 603, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 449, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
  File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 251, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 71, in read_command
    command = serializer._read_with_length(file)
  File "/usr/local/lib/python3.8/dist-packages/pyspark/serializers.py", line 160, in _read_with_length
    return self.loads(obj)
  File "/usr/local/lib/python3.8/dist-packages/pyspark/serializers.py", line 430, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'jobs'

The import spark scripts looks something like this

from jobs.lib_a import a
from jobs.udf import udf_function #This is a UDF

The scripts itself is located in jobs/scripts/test_script.py, the entire jobs folder is zipped and then added to spark using pyFiles.

The weird thing is that the other import from jobs module works, only fail for udf.

I have tried approach in this post, creating a separate zip file called udf.zip, putting udf at top level and then add it to spark via pyFiles, but still run into ModuleNotFoundError when I try to import udf.

I have also tried sys.path.append(<the udf path>)

The only approach works is when I copy the udf_function into the spark script test_script.py. This wouldn't work in reality as the udf_function can be shared by other spark script.

The underlying system is: Python 3.8 Spark 3.2 Spark is running in kubernetes

Harry Su
  • 169
  • 4
  • 11
  • Try to use dot (.) in front of "jobs"? Actually, the error is pretty clear and I would review the hierarchy of your folders and how modules are referenced in imports. – Alexander Goida Feb 24 '23 at 09:18
  • But importing other libs from jobs module works fine. e.g. `from jobs.lib_a import a`. This only happen when trying to import udf. Not sure how adding dot would help – Harry Su Feb 24 '23 at 17:56
  • is your spark system just one driver? or cluster with more than two node? if cluster with nodes, then you should set your `udf` to each node. – SEUNGFWANI Feb 28 '23 at 02:20
  • @HarrySu I think the declaration could be different for udf and lib_a. I'm using custom modules and I use dots (.) everywhere. I think it could be that `lib_a` is stated in `__init__.py` and `udf` not. Could you, please, check that? – Alexander Goida Mar 01 '23 at 13:59
  • @SEUNGFWANI It's running with one driver and multiple executors, all the code are package as zip file and pass to spark via pyFiles. So it should be available on all nodes. That's also how it access other library – Harry Su May 08 '23 at 20:27

1 Answers1

1

I was able to make it work.

Some more context is that we are leveraging spark on k8s operator, so we pass in the zip file via pyFiles https://github.com/GoogleCloudPlatform/spark-on-k8s-operator

This work the same as the following

spark_session.sparkContext.addPyFile('/opt/spark/pyfiles/python.zip')

if we set it up in spark script

Initially, we pass in as

pyFiles:
- local:///opt/spark/pyfiles/spinner-python.zip

But local:/// point to the working directory, we have to change it to the following with extra slash(/) to point to absolute.

pyFiles:
- local:////opt/spark/pyfiles/spinner-python.zip

When Pyspark serializes a UDF, it sends a copy of the UDF code to all the worker nodes, we have to point PyFiles to absolute path instead of relative path

Harry Su
  • 169
  • 4
  • 11
  • This is not true actually. The final fix that works is by adding `spark_session.sparkContext.addPyFile('/opt/spark/pyfiles/python.zip')` – Harry Su May 18 '23 at 21:45
  • The reason being spark-submit --py-files only distributed the zip file to worker but does not add it to PYTHONPATH. See: https://stackoverflow.com/questions/36461054/i-cant-seem-to-get-py-files-on-spark-to-work The other way to fix it is to add it to executor PYTHONPATH via `spark.executorEnv.PYTHONPATH: /opt/spark/pyfiles/python.zip:$PYTHONPATH` – Harry Su May 19 '23 at 18:00