I have a question regarding the correct way to install new packages on Spark worker nodes, using Databricks and Mlflow.
What I currently have are the following:
- a training script (using cv2, i.e. opencv2-python library), which logs the tuned ML model, together with dependencies, on mlflow Model Registry
- an inference script which reads the logged ML model together with the saved conda environment, as a spark_udf
- an installation step which reads the conda environment and installs all packages to the required version, via pip install (wrapped in a subprocess.call)
- and then the actual prediction where the spark_udf is called on new data.
The last step is what fails with ModuleNotFoundError.
SparkException: Job aborted due to stage failure: Task 8 in stage 95.0
failed 4 times, most recent failure: Lost task 8.3 in stage 95.0 (TID 577
(xxx.xxx.xxx.xxx executor 0): org.apache.spark.api.python.PythonException:
'ModuleNotFoundError: No module named 'cv2''
I have been closely following the content of this article which seems to cover this same problem:
- "One major reason for such issues is using udfs. And sometimes the udfs don’t get distributed to the cluster worker nodes."
- "The respective dependency modules used in the udfs or in main spark program might be missing or inaccessible from\in the cluster worker nodes."
So it seems that at the moment, despite using spark_udf and conda environment logged to mlflow, installation of the cv2 module only happened on my driver node, but not on the worker nodes. If this is true, I now need to programatically specify these extra dependencies (namely, the cv2 Python module), to the executors/worker nodes.
So what I did was, importing the cv2 in my inference script, and retrieving the path of the cv2's init file, and adding it to spark context, similarly to how it is done for the arbitrary "A.py" file in the blog post above.
import cv2
spark.sparkContext.addFile(os.path.abspath(cv2.__file__))
This doesn't seem to do any change though. I assume the reason is, partly, that I want to add not just a single __init__.py
file, but make the entire cv2 library accessible to the worker nodes; however, the above solution seems to only do it for the __init__.py
. I'm certain that adding all files in all submodules of cv2 is also not the way to go, but I haven't been able to figure out how I could achieve this easily, with a similar command as the addFile()
above.
Similarly, I also tried the other option, addPyFile()
, by pointing it to the cv2 module's root (parent of __init__
):
import cv2
spark.sparkContext.addPyFile(os.path.dirname(cv2.__file__))
but this also didn't help, and I still got stuck with the same error. Furthermore, I would like this process to be automatic, i.e. not having to manually set module paths in the inference code.
Similar posts I came across to:
- Running spacy in pyspark, but getting ModuleNotFoundError: No module named 'spacy', here the only answer suggests to "restart spark session" - not sure though what that means in my specific case, with an active Databricks notebook and a running cluster.
- ModuleNotFoundError in PySpark Worker on rdd.collect(), here the answer states "you're not allowed to access the spark context from executor tasks," which might explain why I failed with both of my approaches above (
addFile
andaddPyFile
). But if this is not allowed, what's the correct workaround then? - PySpark: ModuleNotFoundError: No module named 'app', here there is an informative answer, stating "Your Python code runs on driver, but you udf runs on executor PVM. (...) use the same environment in both driver and executors.", but it's really not clear how to do so programatically, inside of the notebook.