I am using Apache Zeppelin on Apache Mesos with 4 nodes with a total of 210 GB.
My Spark job that is doing a correlation between a small dataset of transactions and a large dataset of events. I'd like to match each transaction with the nearest event based on time and an ID (event time and transaction time, ID and ID).
I get the following error:
FetchFailed(null, shuffleId=1, mapId=-1, reduceId=20,
message=org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:542)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:538)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:538)
at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:155)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:47)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:140)
at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:136)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:136)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Here is my algo
val groupRDD = event
.map { evt => ((evt.id, evt.date_time.toString.dropRight(8)), cdr) }
.groupByKey(new HashPartitioner(128))
.persist(StorageLevel.MEMORY_AND_DISK_SER)
val joinedRDD = groupRDD.rightOuterJoin {
transactions.keyBy { transac => (transac.id, transac.dateTime.toString.dropRight(8)) }}
val result = joinedRDD.mapValues { case(a,b) =>
val goodTransac = a.getOrElse(List(GeoLoc("",0L,"","","","","")))
.reduce((v1,v2) => minDelay(b.dateTime,v1,v2))
SomeClass(b.id, b....., goodTransac.date_time,.....)
}
The groupByKey
shouldn't group too many elements (maybe max 50 per key).
I have noticed that the error has occurred when the memory is too short so I decided to persist serialized on RAM and disk and I changed the serializer to Kryo. I have also reduced the spark.memory.storageFraction
to 0.2
to let more space for the processing.
When I check the web UI I can find that GC is taking more and more time during the processing. When the job definitively fails, the GC takes 20 min over the 22 min of the runtime but not on all workers.
I have already reviewed Why do Spark jobs fail with org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 in speculation mode? but my cluster has still plenty of RAM - around 90 GB free on Mesos.