1

I am running a series of different jobs using Python/Spark, one at a time. To avoid the creation of SparkContex each time, which takes a while, I want to send the context as a parameter to each job. On top of that I want the manager (The code that creates the context and runs the jobs) to have timeout mechanism.

I have a strange error on the first job run, after this it's disappear.

Traceback (most recent call last):
File "/usr/lib/python3.4/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/usr/lib/python3.4/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dev/ComponentEngine/components/ExampleComponent/ExampleComponent.py", line 35, in run
    numbers = sparkContext.parallelize([1,2,3,4,5,6,7,8,9,10])
  File "/home/dev/Programs/spark-1.4.1-bin-hadoop2.6/python/pyspark/context.py", line 395, in parallelize
    readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
  File "/home/dev/Programs/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 772, in __getattr__
    raise Py4JError('{0} does not exist in the JVM'.format(name))
py4j.protocol.Py4JError: PythonRDD does not exist in the JVM

The code:

#!/bin/python3
import multiprocessing
from pyspark import SparkContext

def doJob(sc):
    try:
        sc.parallelize([1,2,3,4,5,6,7,8,9,10])
    except Exception as e:
        print('Got excpetion {}'.format(e))

def runWithTimeout(sc):
    p = multiprocessing.Process(target=doJob, name="runWithTimeout", args=(sc))
    p.start()

    # Wait till the timeout
    p.join(10)

    if p.is_alive():
        p.terminate()
        p.join()


if __name__ == '__main__':
    sc = SparkContext()
    for i in range(3):
        runWithTimeout(sc)

Why does this error appears?
Is there any problem to pass SparkContext this way? I know that it gets serialized and used by the process on the other end, And if the job performs any change to the context the copy of the engine won't get affected.
Is there any such change that would interfere the run of other jobs?

Rtik88
  • 1,777
  • 2
  • 11
  • 13

1 Answers1

1

Serializing a Spark Context doesn't work well in any of the supported languages. Generally whats done is something along the lines of the IBM Spark Kernel or ooyola jobserver where a single process* holds the Spark Context and multiple clients talk to the server. In python the Spark Context includes network sockets which are used to communicate with the JVM SparkContext, and network sockets aren't really serializable objects. Looking at py4j (the library Spark uses to communicate between python and the JVM) multi-threading could work since they can share sockets, but multiple processes not so much.

Holden
  • 7,392
  • 1
  • 27
  • 33
  • Central Spark server sounds great, but there is still the problem of how to create the timeout mechanism. I found this post http://stackoverflow.com/questions/13682249/how-to-terminate-a-thread-in-python-without-loop-in-run-method, so i concluded it is better to use process over thread. I did a few tests, If I open a file/socket in parent process and pass it to the child it still can write to it. And except the error in my post the next 'doJob' works fine using the context passed to it from the parent process. So if the sockets in the context object is OK there shouldn't be any problem? – Rtik88 Sep 08 '15 at 06:21