1

I have a spark job which takes in three inputs and does two outer joins. The data is in key-value format (String, Array[String]). Most important part of the code is:

val partitioner = new HashPartitioner(8000)
val joined = inputRdd1.fullOuterJoin(inputRdd2.fullOuterJoin(inputRdd3, partitioner), partitioner).cache
saveAsSequenceFile(joined, filter="X")
saveAsSequenceFile(joined, filter="Y")

I'm running the job on EMR with r3.4xlarge driver node and 500 m3.xlarge worker nodes. The spark-submit parameters are:

spark-submit --deploy-mode client --master yarn-client --executor-memory 3g --driver-memory 100g --executor-cores 3 --num-executors 4000 --conf spark.default.parallelism=8000 --conf spark.storage.memoryFraction=0.1 --conf spark.shuffle.memoryFraction=0.2  --conf spark.yarn.executor.memoryOverhead=4000 --conf spark.network.timeout=600s

UPDATE: with this setting, number of executors seen in spark jobs UI were 500 (one per node)

The exception I see in the driver log is the following:

17/10/13 21:37:57 WARN HeartbeatReceiver: Removing executor 470 with no recent heartbeats: 616136 ms exceeds timeout 600000 ms
17/10/13 21:39:04 ERROR ContextCleaner: Error cleaning broadcast 5
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [600 seconds]. This timeout is controlled by spark.network.timeout at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcEnv.scala:214)
...

Some of the things I tried that failed:

  • I thought the problem would be because of there are too many executors being spawned and driver has an overhead of tracking these executors. I tried reducing the number of executors by increasing the executor-memory to 4g. This did not help.
  • I tried changing the instance type of driver to r3.8xlarge, this did not help either.

Surprisingly, when I reduce the number of worker nodes to 300, the job runs file. Does any one have any other hypothesis on why this would happen?

learnerer
  • 396
  • 1
  • 2
  • 17
  • - https://stackoverflow.com/questions/40474057/what-are-possible-reasons-for-receiving-timeoutexception-futures-timed-out-afte - https://stackoverflow.com/questions/45171175/error-error-cleaning-broadcast-exception – vaquar khan Oct 14 '17 at 02:12

2 Answers2

0

Well this is a little bit a problem to understand how is the allocation of Spark works.

According to your information, you have 500 nodes with 4 cores each. So, you have 4000 cores. What you are doing with your request is creating 4000 executors with 3 cores each. It means that you are requesting 12000 cores for your cluster and there is no thing like that.

This error of RPC timeout is regularly associated with how many jvms you started in the same machine, and that machine is not able to respond in proper time due to much thing happens at the same time.

You need to know that, --num-executors is better been associated to you nodes, and the number of cores should be associated to the cores you have in each node.

For example, the configuration of m3.xLarge is 4 cores with 15 Gb of RAM. What is the best configuration to run a job there? That depends what you are planning to do. See if you are going to run just one job I suggest you to set up like this:

spark-submit --deploy-mode client --master yarn-client --executor-memory 10g --executor-cores 4 --num-executors 500 --conf spark.default.parallelism=2000 --conf spark.yarn.executor.memoryOverhead=4000

This will allow you job to run fine, if you don't have problem to fit your data to your worker is better change the default.parallelism to 2000 or you are going to lost lot of time with shuffle.

But, the best approach I think that you can do is keeping the dynamic allocation that EMR enables it by default, just set the number of cores and the parallelism and the memory and you job will run like a charm.

Thiago Baldim
  • 7,362
  • 3
  • 29
  • 51
0

I experimented with lot of configurations modifying one parameter at a time with 500 nodes. I finally got the job to work by lowering the number of partitions in the HashPartitioner from 8000 to 3000.

val partitioner = new HashPartitioner(3000)

So probably the driver is overwhelmed with a the large number of shuffles that has to be done when there are more partitions and hence the lower partition helps.

learnerer
  • 396
  • 1
  • 2
  • 17