3

I am learning Apache-Spark now. After carefully reading Spark tutorials, I understand how to pass a Python function to Apache-Spark to deal with RDD dataset. But now I still have no ideas on how Apache-Spark works with methods inside a class. For example, I have my code as below:

import numpy as np
import copy
from pyspark import SparkConf, SparkContext

class A():
    def __init__(self, n):
        self.num = n

class B(A):
    ### Copy the item of class A to B.
    def __init__(self, A):
        self.num = copy.deepcopy(A.num)

    ### Print out the item of B
    def display(self, s):
        print s.num
        return s

def main():
    ### Locally run an application "test" using Spark.
    conf = SparkConf().setAppName("test").setMaster("local[2]")

    ### Setup the Spark configuration.
    sc = SparkContext(conf = conf)

    ### "data" is a list to store a list of instances of class A. 
    data = []
    for i in np.arange(5):
        x = A(i)
        data.append(x)

    ### "lines" separate "data" in Spark.  
    lines = sc.parallelize(data)

    ### Parallelly creates a list of instances of class B using
    ### Spark "map".
    temp = lines.map(B)

    ### Now I got the error when it runs the following code:
    ### NameError: global name 'display' is not defined.
    temp1 = temp.map(display)

if __name__ == "__main__":
    main()

In fact, I used the above code to parallelly generate a list of instances of class B using temp = lines.map(B). After that, I did temp1 = temp.map(display), as I wanted to parallelly print out each of the items in that list of instances of class B. But now the error shows up: NameError: global name 'display' is not defined. I am wondering how I can fix the error, if I still use Apache-Spark parallel computing. I really appreciate if anyone helps me.

Ruofan Kong
  • 1,060
  • 1
  • 17
  • 34
  • 1
    1. `display` is a method, so what you want is `lambda x: x.display()`. 2. I've already mentioned that - if you're interested in side effects it is idiomatic to use `foreach` 3. Another thing I've already mentioned is that printing won't work as you expect. – zero323 Jul 07 '15 at 22:25
  • Thank you for your great answer!!! Now I am wondering why printing won't work as you mentioned here. – Ruofan Kong Jul 07 '15 at 22:32
  • Also, when I try `temp1 = temp.foreach(lambda x: x.display())` , it shows up a new error: `AttributeError: 'module' object has no attribute 'A'`. How can I fix this problem as well? I really appreciate for your help! – Ruofan Kong Jul 07 '15 at 22:41
  • 1
    Well, `print` won't work for more or less the same reason why you see above error. I outlined it in the [answer for your previous question](http://stackoverflow.com/a/31256689/1560062). Everything involving operations on RDDs happens on worker nodes. It means that output from print goes there and not to the driver. If you want to use class it has to be shipped to workers as well. One way to do it is to create a module. See [here](http://stackoverflow.com/a/31095244/1560062) for details. – zero323 Jul 07 '15 at 23:06
  • Thanks! Since this code is stored in /mydir/test.py, I changed `sc = SparkContext(conf = conf)` to `sc = SparkContext(conf = conf, ['/mydir/test.py'])`, and I also changed `temp1 = temp.map(display)` to `temp1 = temp.map(lambda x: x.display()).reduce(lambda x: x)`, but I still got the error: `'module' object has no attribute 'A'`. Could you please help me find out the reasons? – Ruofan Kong Jul 08 '15 at 02:21

1 Answers1

4

Structure

.
├── ab.py
└── main.py

main.py

import numpy as np
from pyspark import SparkConf, SparkContext
import os
from ab import A, B

def main():
    ### Locally run an application "test" using Spark.
    conf = SparkConf().setAppName("test").setMaster("local[2]")

    ### Setup the Spark configuration.
    sc = SparkContext(
            conf = conf, pyFiles=[
               os.path.join(os.path.abspath(os.path.dirname(__file__)), 'ab.py')]
    ) 

    data = []
    for i in np.arange(5):
        x = A(i)
        data.append(x)

    lines = sc.parallelize(data)
    temp = lines.map(B)

    temp.foreach(lambda x: x.display()) 

if __name__ == "__main__":
    main()

ab.py

import copy

class A():
    def __init__(self, n):
        self.num = n

class B(A):
    ### Copy the item of class A to B.
    def __init__(self, A):
        self.num = copy.deepcopy(A.num)

    ### Print out the item of B
    def display(self):
        print self.num

Comments:

  • once again - printing is a bad idea. Ignoring Spark architecture there is a good chance it will be a bottleneck in your program.
  • if you need diagnostic output consider logging or collect a sample and inspect locally: for x in rdd.sample(False, 0.001).collect(): x.display()
  • for side effects use foreach instead of map
  • I modified display method. I wasn't sure what should be s in this context
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Wonderful Answers!!! Now it works perfectly. Also, I think I carefully read the Spark tutorials on this (https://spark.apache.org/docs/latest/index.html) , but I still get confused on the architecture as well as the usage of the functions. If possible, could you please recommend books or other tutorials? I would really appreciate it!!! – Ruofan Kong Jul 08 '15 at 15:42
  • 1
    Generally speaking recommendations are off-topic on SO, but for starters you could try [AMP Camp](http://ampcamp.berkeley.edu/) or [Learning Spark](http://shop.oreilly.com/product/0636920028512.do) co-authored by @holden – zero323 Jul 08 '15 at 20:23