0

I have a Python library that I have cythonized using this approach. On all of my Spark cluster nodes, I have installed the whl file as follows.

pip install myapi-0.0.1-cp38-cp38-linux_x86_64.whl

When I submit a job to Spark standalone as follows, the code runs fine.

spark-submit \
    --master spark://172.18.0.32:7077 \
    test.py

When I submit a job via YARN with client deployment mode, the code also runs fine.

spark-submit \
    --master yarn \
    --deploy-mode client \
    test.py

However, when I submit a job via YARN cluster deployment mode, the code breaks.

spark-submit \
    --master yarn \
    --deploy-mode cluster \
    test.py

In particular, I get this error.

pickle.PicklingError: Can't pickle <cyfunction Data.compute.. at 0x7f51874d72b0>: attribute lookup lambda on myapi.

The code myapi.utils.Data is nothing special, and looks like this.

class Data:
    def __init__(self, rdd):
        self.rdd = rdd

    def compute(self):
        return self.rdd.map(change_it).reduce(lambda a, b: a + b)

def change_it(n):
    a = lambda v: v
    b = lambda v: v
    c = lambda v: v
    d = lambda v: v
    e = lambda v: v
    f = lambda v: a(b(c(d(e(v)))))
    
    return f(n)

There are a few places discussing the difficulties of pickling (web and SO) nested functions with PySpark, pickle, cloudpickle and cythonized modules. However, the answers do not seem to be a blanket answer that would explain why it would work in one case and not in others as I have experimented with above.

Any further explanation on why I get these observations would be appreciated.

My environment setup is as follows with Spark.

  • Spark v3.3.1
  • Hadoop v3.2.1
  • Python v3.8
DavidW
  • 29,336
  • 6
  • 55
  • 86
Jane Wayne
  • 8,205
  • 17
  • 75
  • 120

1 Answers1

1

I'm not hugely familiar with Spark, but I expect that "client" mode runs in the same Python process and thus doesn't require anything to be serialized (the objects can simply be created and used). "cluster" mode is presumably designed to run on a bunch of different computers and thus requires the data that is passed to the separate processes pickled and unpickled in order to distribute it.

I believe cloudpickle and dill can look inside regular Python functions and lambda, extract the bytecode, and then reconstruct the functions. This is obviously not possible with Cython. On current versions of Cython functions are pickled by name (thus lambdas aren't pickleable, nor any inner function).

Your options are:

  1. Rewrite any functions or lambdas that capture variables in terms of pickleable classes (with a __call__ function); replace other lambdas with def functions defined at the function or class scope.

  2. Try this experimental branch which makes a large portion of Cython functions pickleable.

DavidW
  • 29,336
  • 6
  • 55
  • 86