2

I am using Apache Zeppelin on Apache Mesos with 4 nodes with a total of 210 GB.

My Spark job that is doing a correlation between a small dataset of transactions and a large dataset of events. I'd like to match each transaction with the nearest event based on time and an ID (event time and transaction time, ID and ID).

I get the following error:

FetchFailed(null, shuffleId=1, mapId=-1, reduceId=20,
  message=org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:542)
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:538)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:538)
    at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:155)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:47)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
    at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:140)
    at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:136)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:136)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Here is my algo

val groupRDD = event
    .map { evt => ((evt.id, evt.date_time.toString.dropRight(8)), cdr) }
    .groupByKey(new HashPartitioner(128))
    .persist(StorageLevel.MEMORY_AND_DISK_SER)
val joinedRDD = groupRDD.rightOuterJoin {
    transactions.keyBy { transac => (transac.id, transac.dateTime.toString.dropRight(8)) }}
val result = joinedRDD.mapValues { case(a,b) => 
    val goodTransac = a.getOrElse(List(GeoLoc("",0L,"","","","","")))
        .reduce((v1,v2) => minDelay(b.dateTime,v1,v2))
    SomeClass(b.id, b....., goodTransac.date_time,.....)
}

The groupByKey shouldn't group too many elements (maybe max 50 per key).

I have noticed that the error has occurred when the memory is too short so I decided to persist serialized on RAM and disk and I changed the serializer to Kryo. I have also reduced the spark.memory.storageFraction to 0.2 to let more space for the processing.

When I check the web UI I can find that GC is taking more and more time during the processing. When the job definitively fails, the GC takes 20 min over the 22 min of the runtime but not on all workers.

I have already reviewed Why do Spark jobs fail with org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 in speculation mode? but my cluster has still plenty of RAM - around 90 GB free on Mesos.

Community
  • 1
  • 1
vgkowski
  • 519
  • 6
  • 15
  • how do you know this is due to a large GC? are there other errors? – Martin Serrano Mar 10 '16 at 13:39
  • How do you submitting the job and what is cluster type (i.e. standalone,yarn or mesos )? And have you gone through [this](http://stackoverflow.com/questions/28901123/org-apache-spark-shuffle-metadatafetchfailedexception-missing-an-output-locatio) – Mahendra Mar 10 '16 at 14:02
  • I have seen sometimes the GC is taking too much time 20 min over the 22 min runtime. I am using Zeppelin on mesos. I have already check this link but my cluster has still plenty of RAM. I have 4 nodes with a total of 210 GB but there is still 90 GB free on Mesos – vgkowski Mar 10 '16 at 14:09
  • Can you please include a screenshot from Jobs and Stages in web UI? – Jacek Laskowski Dec 22 '16 at 15:06

1 Answers1

0

What I'd do is to check the number of partitions of event RDD and after groupByKey. Use RDD.getNumPartitions.

Using StorageLevel.MEMORY_AND_DISK_SER will require more IO that can slow down your executors and given SER could cause longer GCs (after all, the data sets are in memory and they have to be serialized that almost double the memory requirement).

I'd strongly recommend not to use MEMORY_AND_DISK_SER at this point.

I'd also check out the dependency graph of result RDD and see how many shuffles and partitions are used in every stage.

result.toDebugString

There are quite few places that can go awry.

p.s. Attaching screenshots from web UI's Jobs, Stages, Storage, and Executors pages would be very helpful to narrow down the root cause.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420