52

When I run the parsing code with 1 GB dataset it completes without any error. But, when I attempt 25 gb of data at a time I get below errors. I'm trying to understand how can I avoid below failures. Happy to hear any suggestions or ideas.

Differnt errors,

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0

org.apache.spark.shuffle.FetchFailedException: Failed to connect to ip-xxxxxxxx

org.apache.spark.shuffle.FetchFailedException: Error in opening FileSegmentManagedBuffer{file=/mnt/yarn/nm/usercache/xxxx/appcache/application_1450751731124_8446/blockmgr-8a7b17b8-f4c3-45e7-aea8-8b0a7481be55/08/shuffle_0_224_0.data, offset=12329181, length=2104094}

Cluster Details:

Yarn: 8 Nodes
Total cores: 64
Memory: 500 GB
Spark Version: 1.5

Spark submit statement:

spark-submit --master yarn-cluster \
                        --conf spark.dynamicAllocation.enabled=true \
                        --conf spark.shuffle.service.enabled=true \
                        --executor-memory 4g \
                        --driver-memory 16g \
                        --num-executors 50 \
                        --deploy-mode cluster \
                        --executor-cores 1 \
                        --class my.parser \
                        myparser.jar \
                        -input xxx \
                        -output xxxx \

One of stack trace:

at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456)
at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183)
at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
WoodChopper
  • 4,265
  • 6
  • 31
  • 55

5 Answers5

108

This error is almost guaranteed to be caused by memory issues on your executors. I can think of a couple of ways to address these types of problems.

1) You could try to run with more partitions (do a repartition on your dataframe). Memory issues typically arise when one or more partitions contain more data than will fit in memory.

2) I'm noticing that you have not explicitly set spark.yarn.executor.memoryOverhead, so it will default to max(386, 0.10* executorMemory) which in your case will be 400MB. That sounds low to me. I would try to increase it to say 1GB (note that if you increase memoryOverhead to 1GB, you need to lower --executor-memory to 3GB)

3) Look in the log files on the failing nodes. You want to look for the text "Killing container". If you see the text "running beyond physical memory limits", increasing memoryOverhead will - in my experience - solve the problem.

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
Glennie Helles Sindholt
  • 12,816
  • 5
  • 44
  • 50
  • 2
    Is number 2) also counts in standalone mode. If yes, how we can set it. I can't find similar var in standalone mode. – Laeeq May 02 '17 at 14:57
  • isnt the overhead memory separate from the executor memory if it is specified explicitly? Why should we have to decrease the `--executor memory`? – thentangler Jan 12 '21 at 23:08
  • 1
    Typically, we want to maximize use of memory, so if we have 4GB available we want to use all 4GB, hence setting `--executor-memory` to 4GB. If we want to now allocate 1GB of the 4GB available to `memoryOverhead`, then we have to scale down `--executor-memory` to 3GB. – Glennie Helles Sindholt Jan 14 '21 at 09:45
  • repartition(10) worked for me – Sushil Verma Jun 02 '22 at 03:25
20

In addition to the memory and network config issues described above, it's worth noting that for large tables (e.g. several TB here), org.apache.spark.shuffle.FetchFailedException can occur due to timeout retrieving shuffle partitions. To fix this problem, you can set the following:

SET spark.reducer.maxReqsInFlight=1;  -- Only pull one file at a time to use full network bandwidth.
SET spark.shuffle.io.retryWait=60s;  -- Increase the time to wait while retrieving shuffle partitions before retrying. Longer times are necessary for larger files.
SET spark.shuffle.io.maxRetries=10;
7

I've also had some good results by increasing the Spark timeout spark.network.timeout to a larger value like 800. The default 120 seconds will cause a lot of your executors to time out when under heavy load.

Ted
  • 592
  • 6
  • 8
5

Okay, it's an old thread and there are quite a few answers around on Stackoverflow but I lost a couple of days to this error and I think sharing the story might help.

There are actually a couple of ways this can happen. As Glennie's great answer mentioned, this is most likely a memory issue so make sure you have enough memory for everything. There are container-memory, AM memory, map-memory, reduce-memory, etc. configurations to watch out for. Reading this can be a lot of help for finding the right configurations. You should pick the numbers yourself but here are a few properties that I set.

yarn-site.xml

<property>
    <name>yarn.nodemanager.resource.memory-mb</name>
    <value>32768</value>
</property>

<property>
    <name>yarn.app.mapreduce.am.resource.mb</name>
    <value>4096</value>
</property>

<property>
    <name>yarn.scheduler.minimum-allocation-mb</name>
    <value>4096</value>
</property>

mapred-site.xml

<property>
    <name>mapreduce.map.memory.mb</name>
    <value>4096</value>
</property>

<property>
    <name>mapreduce.reduce.memory.mb</name>
    <value>4096</value>
</property>

These can fix some other errors that you might run into, such as PySpark shell crashing on startup. But in my case, although some errors vanished (such as the MetadataFetchFailed errors), the problem persisted. The exact error was:

org.apache.spark.shuffle.FetchFailedException: Failed to connect to DB-ETA-C/x.x.x.x:34085

After playing around with every possible YARN and Spark property from Spark timeouts to the YARN shuffle service, I ended up realizing that in the error logs the failed container is looking for x.x.x.x, the local (internal) IP while running netstat -tulpn | grep <PORT NUM> returned y.y.y.y:34085 in which y.y.y.y is the external IP address. It wasn't a memory issue at all, it was merely a network configuration issue.

The Spark service was binding to the external interface only, because the hostname was associated with the external IP in /etc/hosts. After updating the /etc/hosts file the issue was fixed.

Bottom line: The error obviously says some container is not able to reach another one. This is usually due to failed containers because of memory issues, but it can as well be a network issue so watch for those too especially if you have multiple interfaces on your nodes.

Iman Akbari
  • 2,167
  • 26
  • 31
1

If all the shuffle tasks are failing, then a probable reason can be dependency conflcit for netty. Excluding netty dependecies from spark-core worked for me.

       <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>io.netty</groupId>
                    <artifactId>netty-all</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>io.netty</groupId>
                    <artifactId>netty</artifactId>
                </exclusion>
            </exclusions>
        </dependency>