I am running a series of different jobs using Python/Spark, one at a time. To avoid the creation of SparkContex each time, which takes a while, I want to send the context as a parameter to each job. On top of that I want the manager (The code that creates the context and runs the jobs) to have timeout mechanism.
I have a strange error on the first job run, after this it's disappear.
Traceback (most recent call last):
File "/usr/lib/python3.4/multiprocessing/process.py", line 254, in _bootstrap
self.run()
File "/usr/lib/python3.4/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/home/dev/ComponentEngine/components/ExampleComponent/ExampleComponent.py", line 35, in run
numbers = sparkContext.parallelize([1,2,3,4,5,6,7,8,9,10])
File "/home/dev/Programs/spark-1.4.1-bin-hadoop2.6/python/pyspark/context.py", line 395, in parallelize
readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
File "/home/dev/Programs/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 772, in __getattr__
raise Py4JError('{0} does not exist in the JVM'.format(name))
py4j.protocol.Py4JError: PythonRDD does not exist in the JVM
The code:
#!/bin/python3
import multiprocessing
from pyspark import SparkContext
def doJob(sc):
try:
sc.parallelize([1,2,3,4,5,6,7,8,9,10])
except Exception as e:
print('Got excpetion {}'.format(e))
def runWithTimeout(sc):
p = multiprocessing.Process(target=doJob, name="runWithTimeout", args=(sc))
p.start()
# Wait till the timeout
p.join(10)
if p.is_alive():
p.terminate()
p.join()
if __name__ == '__main__':
sc = SparkContext()
for i in range(3):
runWithTimeout(sc)
Why does this error appears?
Is there any problem to pass SparkContext this way? I know that it gets serialized and used by the process on the other end, And if the job performs any change to the context the copy of the engine won't get affected.
Is there any such change that would interfere the run of other jobs?