While integrating pyspark in my application's code-base, I couldn't refer a class's method in a RDD's map method. I duplicated the issue with a simple example which is as follows
Here's a dummy class that, I have defined which just adds a number to every element of RDD derived from a RDD which is a class attribute:
class Test:
def __init__(self):
self.sc = SparkContext()
a = [('a', 1), ('b', 2), ('c', 3)]
self.a_r = self.sc.parallelize(a)
def add(self, a, b):
return a + b
def test_func(self, b):
c_r = self.a_r.map(lambda l: (l[0], l[1] * 2))
v = c_r.map(lambda l: self.add(l[1], b))
v_c = v.collect()
return v_c
test_func()
calls map()
method on a RDD v
, which in-turn calls the add()
method on every element of v
. Calling test_func()
throws the following error:
pickle.PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
Now, when I move the add()
method out of class like:
def add(self, a, b):
return a + b
class Test:
def __init__(self):
self.sc = SparkContext()
a = [('a', 1), ('b', 2), ('c', 3)]
self.a_r = self.sc.parallelize(a)
def test_func(self, b):
c_r = self.a_r.map(lambda l: (l[0], l[1] * 2))
v = c_r.map(lambda l: add(l[1], b))
v_c = v.collect()
return v_c
Calling test_func()
works properly now.
[7, 9, 11]
Why does it happen and how can I pass class methods to a RDD's map()
method?