1

I'm using spark 1.2 (2 works and 8GO per worker) with Cassandra and I have an OutOfMemoryError exce: Java heap space error. This error appears when I execute my algorithm with a large quantity of data (15M rows). Here is the error:

ver-akka.remote.default-remote-dispatcher-6] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
at org.spark_project.protobuf.ByteString.copyFrom(ByteString.java:192)
at org.spark_project.protobuf.CodedInputStream.readBytes(CodedInputStream.java:324)
at akka.remote.ContainerFormats$SelectionEnvelope.<init>(ContainerFormats.java:223)
at akka.remote.ContainerFormats$SelectionEnvelope.<init>(ContainerFormats.java:173)
at akka.remote.ContainerFormats$SelectionEnvelope$1.parsePartialFrom(ContainerFormats.java:282)
at akka.remote.ContainerFormats$SelectionEnvelope$1.parsePartialFrom(ContainerFormats.java:277)
at org.spark_project.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:141)
at org.spark_project.protobuf.AbstractParser.parseFrom(AbstractParser.java:176)
at org.spark_project.protobuf.AbstractParser.parseFrom(AbstractParser.java:188)
at org.spark_project.protobuf.AbstractParser.parseFrom(AbstractParser.java:193)
at org.spark_project.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
at akka.remote.ContainerFormats$SelectionEnvelope.parseFrom(ContainerFormats.java:494)
at akka.remote.serialization.MessageContainerSerializer.fromBinary(MessageContainerSerializer.scala:62)
at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
at scala.util.Try$.apply(Try.scala:161)
at akka.serialization.Serialization.deserialize(Serialization.scala:98)
at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:58)
at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:58)
at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:76)
at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:937)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/06/23 17:46:26 INFO DAGScheduler: Job 1 failed: reduce at CustomerJourney.scala:135, took 60.447249 s
Exception in thread "main" 15/06/23 17:46:26 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:701)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:701)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1428)
at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundPostStop(DAGScheduler.scala:1375)
at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
at akka.actor.ActorCell.terminate(ActorCell.scala:369)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/06/23 17:46:26 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.

I've tried to change the configuration of spark.kryoserializer.buffer.mb and spark.storage.memoryFraction but I still have the same error.

I also create an heap dump with Eclipse MAT to determine which objects caused the problem. here is the result i got enter image description here

the class finalizer eat so much memory. After some researchs in google i found this question : is memory leak? why java.lang.ref.Finalizer eat so much memory

this is the answer of the question if found :

Some classes implement the Object.finalize() method. Objects which override this method need to called by a background thread call finalizer, and they can't be cleaned up until this happens. If these tasks are short and you don't discard many of these it all works well. However if you are creating lots of these objects and/or their finalizers take a long time, the queue of objects to be finalized builds up. It is possible for this queue to use up all the memory.
The solution is

-don't use finalize()d objects if you can (if you are writing the class for the object)
-make finalize very short (if you have to use it)
-don't discard such objects every time (try to re-use them)

but for my case i haven't ceate any class who uses this class.

Community
  • 1
  • 1
Amine CHERIFI
  • 1,155
  • 2
  • 20
  • 35
  • Seems like an issue caused by your algorithm. Not an issue cause by Spark. – Branislav Lazic Jun 24 '15 at 08:08
  • 2
    You might want to instruct your JVM to create an heap dump on OOME (or create one if the OOME does not happen too fast). You can then use Eclipse MAT to determine in which objects the memory is spent. I assume you already checked to which amount Xms and Xmx were set for your JVM ? – Marged Jun 24 '15 at 08:28
  • data input size? executor memory size? which algorithm? – eliasah Jun 24 '15 at 08:44
  • thank you for your answers. I just updated my question to give more details and the result of heap dump on OOME. – Amine CHERIFI Jun 24 '15 at 14:28

0 Answers0