2

Simple pyspark program to compute pi via Monte Carlo approximation:

def inside(p):
    x, y = p
    return x*x + y*y < 1

num_samples = 10000000
count = sc.parallelize(((random.random(), random.random()) for i in range(num_samples))).filter(inside).count()
pi = 4 * count / num_samples
print(pi)

Trying to understand caching of RDDs, I tried the following:

rdd = sc.parallelize(((random.random(), random.random()) for i in range(num_samples)))
cached_rdd = rdd.filter(inside).cache()
cached_rdd.count()

... and crashed. I cannot make sense of the following error message:

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-18-14d09cc35074> in <module>()
----> 1 cached_rdd.count()

/usr/local/opt/apache-spark/libexec/python/pyspark/rdd.py in count(self)
   1054         3
   1055         """
-> 1056         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
   1057 
   1058     def stats(self):

/usr/local/opt/apache-spark/libexec/python/pyspark/rdd.py in sum(self)
   1045         6.0
   1046         """
-> 1047         return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
   1048 
   1049     def count(self):

/usr/local/opt/apache-spark/libexec/python/pyspark/rdd.py in fold(self, zeroValue, op)
    919         # zeroValue provided to each partition is unique from the one provided
    920         # to the final reduce call
--> 921         vals = self.mapPartitions(func).collect()
    922         return reduce(op, vals, zeroValue)
    923 

/usr/local/opt/apache-spark/libexec/python/pyspark/rdd.py in collect(self)
    822         """
    823         with SCCallSiteSync(self.context) as css:
--> 824             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    825         return list(_load_from_socket(port, self._jrdd_deserializer))
    826 

/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1158         answer = self.gateway_client.send_command(command)
   1159         return_value = get_return_value(
-> 1160             answer, self.gateway_client, self.target_id, self.name)
   1161 
   1162         for temp_arg in temp_args:

/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    318                 raise Py4JJavaError(
    319                     "An error occurred while calling {0}{1}{2}.\n".
--> 320                     format(target_id, ".", name), value)
    321             else:
    322                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job 3 cancelled because SparkContext was shut down
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:837)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:835)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
    at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:835)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1838)
    at org.apache.spark.util.EventLoop.stop(EventLoop.scala:83)
    at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1751)
    at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1924)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1357)
    at org.apache.spark.SparkContext.stop(SparkContext.scala:1923)
    at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:572)
    at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
    at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:153)
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)
----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 52610)
Traceback (most recent call last):
  File "/usr/local/Cellar/python/3.6.4_3/Frameworks/Python.framework/Versions/3.6/lib/python3.6/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/local/Cellar/python/3.6.4_3/Frameworks/Python.framework/Versions/3.6/lib/python3.6/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/usr/local/Cellar/python/3.6.4_3/Frameworks/Python.framework/Versions/3.6/lib/python3.6/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/local/Cellar/python/3.6.4_3/Frameworks/Python.framework/Versions/3.6/lib/python3.6/socketserver.py", line 696, in __init__
    self.handle()
  File "/usr/local/opt/apache-spark/libexec/python/pyspark/accumulators.py", line 235, in handle
    num_updates = read_int(self.rfile)
  File "/usr/local/opt/apache-spark/libexec/python/pyspark/serializers.py", line 685, in read_int
    raise EOFError
EOFError
----------------------------------------
clstaudt
  • 21,436
  • 45
  • 156
  • 239

1 Answers1

3

This piece of code will cause you a lot of troubles:

sc.parallelize(((random.random(), random.random()) for i in range(num_samples))).filter(inside).count()

To create RDD Spark will evaluate a whole structure locally on the driver (you'll need ~3 times more memory than required to store data alone), and likely crash your machine.

The "right" way to do it is to initialize data as range and map:

def get_points(_):
    return ...  # Return tuple of points

sc.range(num_samples).map(get_points).filter(inside).count()

but correct implementation of is a bit - tricky Random numbers generation in PySpark

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
  • Yes that solved the issue and I agree that this is a better implementation. But how could I have diagnosed this from the error message? – clstaudt Apr 10 '18 at 16:43
  • Practice? There is no obvious indicator here but based on `Job 3 cancelled because SparkContext was shut down` we know that your code caused driver failure. This clearly suggest that the problem is not related to distributed part of the code. And if you look at the code, possible candidate is obvious, if you have enough Spark experience. – Alper t. Turker Apr 10 '18 at 17:16