I'm using spark 1.2 (2 works and 8GO per worker) with Cassandra and I have an OutOfMemoryError exce: Java heap space error. This error appears when I execute my algorithm with a large quantity of data (15M rows). Here is the error:
ver-akka.remote.default-remote-dispatcher-6] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
at org.spark_project.protobuf.ByteString.copyFrom(ByteString.java:192)
at org.spark_project.protobuf.CodedInputStream.readBytes(CodedInputStream.java:324)
at akka.remote.ContainerFormats$SelectionEnvelope.<init>(ContainerFormats.java:223)
at akka.remote.ContainerFormats$SelectionEnvelope.<init>(ContainerFormats.java:173)
at akka.remote.ContainerFormats$SelectionEnvelope$1.parsePartialFrom(ContainerFormats.java:282)
at akka.remote.ContainerFormats$SelectionEnvelope$1.parsePartialFrom(ContainerFormats.java:277)
at org.spark_project.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:141)
at org.spark_project.protobuf.AbstractParser.parseFrom(AbstractParser.java:176)
at org.spark_project.protobuf.AbstractParser.parseFrom(AbstractParser.java:188)
at org.spark_project.protobuf.AbstractParser.parseFrom(AbstractParser.java:193)
at org.spark_project.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
at akka.remote.ContainerFormats$SelectionEnvelope.parseFrom(ContainerFormats.java:494)
at akka.remote.serialization.MessageContainerSerializer.fromBinary(MessageContainerSerializer.scala:62)
at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
at scala.util.Try$.apply(Try.scala:161)
at akka.serialization.Serialization.deserialize(Serialization.scala:98)
at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:58)
at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:58)
at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:76)
at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:937)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/06/23 17:46:26 INFO DAGScheduler: Job 1 failed: reduce at CustomerJourney.scala:135, took 60.447249 s
Exception in thread "main" 15/06/23 17:46:26 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:701)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:701)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1428)
at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundPostStop(DAGScheduler.scala:1375)
at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
at akka.actor.ActorCell.terminate(ActorCell.scala:369)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/06/23 17:46:26 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
I've tried to change the configuration of spark.kryoserializer.buffer.mb
and spark.storage.memoryFraction
but I still have the same error.
I also create an heap dump with Eclipse MAT to determine which objects caused the problem. here is the result i got
the class finalizer eat so much memory. After some researchs in google i found this question : is memory leak? why java.lang.ref.Finalizer eat so much memory
this is the answer of the question if found :
Some classes implement the Object.finalize() method. Objects which override this method need to called by a background thread call finalizer, and they can't be cleaned up until this happens. If these tasks are short and you don't discard many of these it all works well. However if you are creating lots of these objects and/or their finalizers take a long time, the queue of objects to be finalized builds up. It is possible for this queue to use up all the memory.
The solution is
-don't use finalize()d objects if you can (if you are writing the class for the object)
-make finalize very short (if you have to use it)
-don't discard such objects every time (try to re-use them)
but for my case i haven't ceate any class who uses this class.