2

I am currently working with an EMR cluster connecting to RDS to gather 2 table.

The two RDD created are quite huge but I can perform .take(x) operations other them.

I can also perform more complex operations such as:

info_rdd = somerdd.map(lambda x: (x[1], x[2])).groupByKey().map(some_lambda)
apps_rdd = apps.join(info_rdd).map(lambda x: (x[0], (x[1][0], x[1][1][0], x[1][1][1])))

But doing the following operation to count the number of distinct users imported from RDS do not work:

unique_users = rdd.distinct.count()

I have tried many configuration to see if it was a memory issues before (just in case but it does not solve the problem) ...

These are the errors I am getting now:

Traceback (most recent call last):
File "/home/hadoop/AppEngine/src/server.py", line 56, in <module>
run_server()
File "/home/hadoop/AppEngine/src/server.py", line 53, in run_server
AppServer().run()
File "/home/hadoop/AppEngine/src/server.py", line 45, in run
api = create_app(self.context, self.apps, self.devices)
File "/home/hadoop/AppEngine/src/api.py", line 190, in create_app
engine = AppEngine(spark_context, apps, devices)
File "/home/hadoop/AppEngine/src/engine.py", line 56, in __init__
self.unique_users = self.ratings.distinct().count()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1041, in count
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1032, in sum
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 906, in fold
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 809, in collect
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco

File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in  stage 0.0 failed 4 times, most recent failure: Lost task 0.5 in stage 0.0 (TID 5, ip-172-31-3-140.eu-west-1.compute.internal, executor 13): ExecutorLostFailure (executor 13 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 164253 ms
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:935)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:934)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)`
lelabo_m
  • 509
  • 8
  • 21
  • I'm referring to the message in exception `ExecutorLostFailure (executor 13 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 164253 ms` – mrsrinivas Feb 14 '17 at 12:03

2 Answers2

2

The solution for the problem was the following:

I did not have enough memory to perform the task. I changed the type of the core instance I was using in my cluster to instance with more memory available (m4.4xlarge here).

Then I had to precise parameters to force the memory allocation of my instances for the spark-sumbmit:

--driver-memory 2G
--executor-memory 50G

You can also add these parameters to avoid a long task from failling because of the heartbeat or the memory allocation:

--conf spark.yarn.executor.memoryOverhead=XXX (large number such as 1024 or 4096)
--conf spark.executor.heartbeatInterval=60s
lelabo_m
  • 509
  • 8
  • 21
  • Have you tried repartitioning your RDD on the key(both having the same number of partitions) prior to the join and groupBy ? If you do, please let me know about the result. – Sebastian Brestin Jan 22 '20 at 12:41
1

ExecutorLostFailure Reason: Executor heartbeat timed out after 164253 ms

This error means that the executor didn't respond after 165 seconds, and was killed (under the assumption that it is dead)

If by any chance you have a task which occupy the executor for such a long time, and is need to be executed you can try the following setting in the spark-submit command line which will increase the heartbeat timeout to a huge amount of time as mentioned here: https://stackoverflow.com/a/37260231/5088142

Some methods how to investigate this issue can be found here: https://stackoverflow.com/a/37272249/5088142


The below will try to clarify some issues which raised in your question.

Spark Actions vs Transformations

Spark uses lazy computation, i.e. when you perform transformation it doesn't execute it. Spark execute only when you perform action

In the complex operations example you gave there is no action (i.e. nothing was executed/computed):

info_rdd = somerdd.map(lambda x: (x[1], x[2])).groupByKey().map(some_lambda)
apps_rdd = apps.join(info_rdd).map(lambda x: (x[0], (x[1][0], x[1][1][0], x[1][1][1])))

Reviewing spark doc about transformation

You can see that all operation used in the example: map, groupByKey and join are transformation.

Hence nothing actually was done after you execute those commands.

The difference between actions

The two RDD created are quite huge but I can perform .take(x) operations other them.

There is a difference between take(x) action, and count

take(x) action ends after it returned the first x elements.

count() action ends only after it pass the entire RDD

The fact that you execute some transformation (as in the example) which were seems to be running has no meaning - as they weren't executed.

Running take(x) action can't give any indication as it will use only very small portion of your RDD.

Conclusion

It seems like the configuration of your machine doesn't support the size of the data you are using, or your code create huge tasks which cause the executors to hang for a long period of time (160 seconds).

The first action which was actually executed on your RDD was the count action

Community
  • 1
  • 1
Yaron
  • 10,166
  • 9
  • 45
  • 65