Running into the following error when use custom UDF
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 603, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 449, in read_udfs
udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 251, in read_single_udf
f, return_type = read_command(pickleSer, infile)
File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 71, in read_command
command = serializer._read_with_length(file)
File "/usr/local/lib/python3.8/dist-packages/pyspark/serializers.py", line 160, in _read_with_length
return self.loads(obj)
File "/usr/local/lib/python3.8/dist-packages/pyspark/serializers.py", line 430, in loads
return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'jobs'
The import spark scripts looks something like this
from jobs.lib_a import a
from jobs.udf import udf_function #This is a UDF
The scripts itself is located in
jobs/scripts/test_script.py
, the entire jobs folder is zipped and then added to spark using pyFiles.
The weird thing is that the other import from jobs module works, only fail for udf.
I have tried approach in this post, creating a separate zip file called udf.zip, putting udf at top level and then add it to spark via pyFiles, but still run into ModuleNotFoundError when I try to import udf
.
I have also tried sys.path.append(<the udf path>)
The only approach works is when I copy the udf_function into the spark script test_script.py
. This wouldn't work in reality as the udf_function can be shared by other spark script.
The underlying system is: Python 3.8 Spark 3.2 Spark is running in kubernetes