1

I am trying to submit a Spark Application to the local Kubernetes cluster on my machine (created via Docker Dashboard). The application depends on a python package, let's call it X.

Here is the application code:

import sys
from pyspark import SparkContext
from pyspark.sql import SparkSession
datafolder = "/opt/spark/data" # Folder created in container by spark's docker file
sys.path.append(datafolder) # X is contained inside of datafolder
from X.predictor import * # import functionality from X

def apply_x_functionality_on(item):
    predictor = Predictor() # class from X.predictor
    predictor.predict(item)

def main():
    spark = SparkSession\
            .builder\
            .appName("AppX")\
            .getOrCreate()
    sc = spark.sparkContext
    data = []
    # Read data: [no problems there]
    ...
    data_rdd = sc.parallelize(data) # create RDD
    data_rdd.foreach(lambda item: apply_network(item)) # call function

if __name__ == "__main__":
    main()

Initially I hoped to avoid such problems by putting the X folder to the data folder of Spark. When container is built, all the content of data folder is being copied to the /opt/spark/data. My Spark application appends contents of data folder to the system path, as such consuming the package X. Well, I thought it does.

Everything works fine until the .foreach function is called. Here is a snippet from loggs with error description:

20/11/25 16:13:54 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 10.1.0.60, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 587, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 74, in read_command
    command = serializer._read_with_length(file)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length
    return self.loads(obj)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 458, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'X'

There are a lot of similar questions here: one, two, three, but none of the answers to them have helped me so far.

What I have tried:

  1. I submitted application with .zip(ed) X (I zip it in container, by applying zip to X):
$SPARK_HOME/bin/spark-submit \
  --master k8s://https://kubernetes.docker.internal:6443 \
  --deploy-mode cluster \
  --conf spark.executor.instances=5 \
  --conf spark.kubernetes.container.image=kostjaigin/spark-py:v3.0.1-X_0.0.1 \
  --py-files "local:///opt/spark/data/X.zip" \
  local:///opt/spark/data/MyApp.py
  1. I added .zip(ed) X to Spark Context:
sc.addPyFile("opt/spark/data/X.zip")
kostjaigin
  • 125
  • 2
  • 8

1 Answers1

1

I have resolved the issue:

  1. Created dependencies folder under /opt/spark/data
  2. Put X to dependencies
  3. Inside of my docker file I pack dependencies folder in a zip archive to submit it later as py-files: cd /opt/spark/data/**dependencies** && zip -r ../dependencies.zip .
  4. In Application:
...
from X.predictor import * # import functionality from X
...
# zipped package
zipped_pkg = os.path.join(datafolder, "dependencies.zip")
assert os.path.exists(zipped_pkg)
sc.addPyFile(zipped_pkg)
...
  1. Add --py-files flag to the submit command:
$SPARK_HOME/bin/spark-submit \
  --master k8s://https://kubernetes.docker.internal:6443 \
  --deploy-mode cluster \
  --conf spark.executor.instances=5 \
  --py-files "local:///opt/spark/data/dependencies.zip" \
  local:///opt/spark/data/MyApp.py
  1. Run it

Basically it is all about adding a dependencies.zip Archive with all the required dependencies in it.

kostjaigin
  • 125
  • 2
  • 8