9

I'm getting random instances of shuffle files not being written (while using) Spark.

15/12/29 17:30:26 ERROR server.TransportRequestHandler: Error sending result 
    ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=347837678000, chunkIndex=0}, 
    buffer=FileSegmentManagedBuffer{file=/data/24/hadoop/yarn/local/usercache/root/appcache
    /application_1451416375261_0032/blockmgr-c2e951bb-856d-487f-a5be-2b3194fdfba6/1a/
    shuffle_0_35_0.data, offset=1088736267, length=8082368}} 
    to /10.7.230.74:42318; closing connection
java.io.FileNotFoundException: 
    /data/24/hadoop/yarn/local/usercache/root/appcache/application_1451416375261_0032/
    blockmgr-c2e951bb-856d-487f-a5be-2b3194fdfba6/1a/shuffle_0_35_0.data 
    (No such file or directory)
    at java.io.FileInputStream.open0(Native Method)
    ...

It appears as if most of the shuffle files are written successfully, just not all of them.

This is the shuffle stage - or the 'reading shuffle files' stage.

At first all of the executors are able to read files. Eventually, and inevitably, one of the executors throws the exception above and is removed. All the others start to fail because they cannot retrieve those shuffle files.

enter image description here

I have 40GB of RAM in each executor, and I've got 8 executors. The extra one in this list is because of the removed executor after failure. My data is large, but I don't see any out of memory problems.

Any thoughts?


Changed my repartition call from 1000 partitions to 100000 partitions, and now I'm getting a new stacktrace.

Job aborted due to stage failure: Task 71 in stage 9.0 failed 4 times, most recent failure: Lost task 71.3 in stage 9.0 (TID 2831, dev-node1): java.io.IOException: FAILED_TO_UNCOMPRESS(5)
    at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
    at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
    at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)
    at org.xerial.snappy.Snappy.uncompress(Snappy.java:480)
    at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:135)
    at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:92)
    at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58)
    at org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:159)
    at org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1179)
    at org.apache.spark.shuffle.hash.HashShuffleReader$$anonfun$3.apply(HashShuffleReader.scala:53)
    at org.apache.spark.shuffle.hash.HashShuffleReader$$anonfun$3.apply(HashShuffleReader.scala:52)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:173)
    at org.apache.spark.sql.execution.TungstenSort.org$apache$spark$sql$execution$TungstenSort$$executePartition$1(sort.scala:160)
    at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$4.apply(sort.scala:169)
    at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$4.apply(sort.scala:169)
    at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
    ...
Matt Andruff
  • 4,974
  • 1
  • 5
  • 21
Kirk Broadhurst
  • 27,836
  • 16
  • 104
  • 169

0 Answers0