10

Over the past few days I've been working on trying to understand how Spark executors know how to use a module by a given name upon import. I am working on AWS EMR. Situation: I initialize pyspark on EMR by typing

pyspark --master yarn

Then, in pyspark,

import numpy as np ## notice the naming

def myfun(x):
    n = np.random.rand(1)
    return x*n

rdd = sc.parallelize([1,2,3,4], 2)
rdd.map(lambda x: myfun(x)).collect() ## works!

My understanding is that when I import numpy as np, the master node is the only node importing and identifying numpy through np. However, with an EMR cluster (2 worker nodes), if I run the map function on the rdd, the driver program sends the function to the worker nodes to execute the function for each item in the list (for each partition), and a successful result is returned.

My question is this: How do the workers know that numpy should be imported as np? Each worker has numpy already installed, but I've not defined explicitly defined a way for each node to import the module as np.

Please refer to the following post by Cloudera for further details on dependencies: http://blog.cloudera.com/blog/2015/09/how-to-prepare-your-apache-hadoop-cluster-for-pyspark-jobs/

Under Complex Dependency they have an example (code) where the pandas module is explicitly imported on each node.

One theory that I've heard being thrown around is that the driver program distributes all code passed in the pyspark interactive shell. I am skeptical of this. The example I bring up to counter this idea is, if on the master node I type:

print "hello"

is every worker node also printing "hello"? I don't think so. But maybe I am wrong on this.

Jon
  • 2,373
  • 1
  • 26
  • 34

1 Answers1

6

When function is serialized there is a number of objects is being saved:

  • code
  • globals
  • defaults
  • closure
  • dict

which can be later used to restore complete environment required for a given function.

Since np is referenced by the function it can be extracted from its code:

from pyspark.cloudpickle import CloudPickler

CloudPickler.extract_code_globals(myfun.__code__)
## {'np'}

and binding can be extracted from its globals:

myfun.__globals__['np']
## <module 'numpy' from ...

So serialized closure (in a broad sense) captures all information required to restore environment. Of course all modules accessed in the closure have to be importable on every worker machine.

Everything else is just reading and writing machinery.

On a side note master node shouldn't execute any Python code. It is responsible for resources allocation not running application code.

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Great, thanks for the input. With that said, does this mean that pieces of code like `print "hello"` are executed at each worker? Or is it ignored, and only the code necessary for the function to operate is executed? – Jon Aug 09 '16 at 00:01
  • 2
    Only code captured by the closure is actually executed on the workers. Everything else is ignored. – zero323 Aug 09 '16 at 00:15