1

My fnl2 dataset is of the form:

scala> fnl2.first()
res4: org.apache.spark.mllib.regression.LabeledPoint = (0.0,(612515,[28693,86703,94568,162663,267733,292870,327313,347868,362660,396595,415817,436773,443713,470149,485282,486556,489594,496185,541453,570126,571088],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))

scala> fnl2.count()
res5: Long = 775946

Then, I try to build a SVMWithSGD model:

import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics

val splits = fnl2.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0).cache()
val test = splits(1)

val numIterations = 100
val model = SVMWithSGD.train(training, numIterations)

But I get the following Java heap size error and then the spark context closes unexpectedly:

15/08/10 09:15:41 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
15/08/10 09:15:41 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
15/08/10 09:23:50 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-30] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
    at com.google.protobuf_spark.ByteString.toByteArray(ByteString.java:213)
    at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:24)
    at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55)
    at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55)
    at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73)
    at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/08/10 09:23:56 ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(eastspark1,57211) not found
org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:694)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:693)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
    at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:693)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1399)
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201)
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
    at akka.actor.ActorCell.terminate(ActorCell.scala:338)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


scala> 15/08/10 09:23:56 ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(10.2.0.14,37151) not found
15/08/10 09:23:56 ERROR SendingConnection: Exception while reading SendingConnection to ConnectionManagerId(10.2.0.16,54187)
java.nio.channels.ClosedChannelException
    at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:257)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:300)
    at org.apache.spark.network.SendingConnection.read(Connection.scala:390)
    at org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

The spark context is based on 12 cores with 8G of memory on each node.

Any ideas?

EDIT

This is the error I get after increasing the diver's memory to 5G: export SPARK_DRIVER_MEMORY="5000M"

scala> val model = SVMWithSGD.train(training, numIterations)
15/08/10 11:33:07 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
15/08/10 11:33:07 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
Exception in thread "qtp950243028-158" java.lang.OutOfMemoryError: GC overhead limit exceeded
15/08/10 11:46:26 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-32] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
    at com.google.protobuf_spark.ByteString.copyFrom(ByteString.java:90)
    at com.google.protobuf_spark.CodedInputStream.readBytes(CodedInputStream.java:289)
    at akka.remote.WireFormats$SerializedMessage$Builder.mergeFrom(WireFormats.java:2700)
    at akka.remote.WireFormats$SerializedMessage$Builder.mergeFrom(WireFormats.java:2546)
    at com.google.protobuf_spark.CodedInputStream.readMessage(CodedInputStream.java:275)
    at akka.remote.WireFormats$RemoteEnvelope$Builder.mergeFrom(WireFormats.java:1165)
    at akka.remote.WireFormats$RemoteEnvelope$Builder.mergeFrom(WireFormats.java:949)
    at com.google.protobuf_spark.CodedInputStream.readMessage(CodedInputStream.java:275)
    at akka.remote.WireFormats$AckAndEnvelopeContainer$Builder.mergeFrom(WireFormats.java:479)
    at akka.remote.WireFormats$AckAndEnvelopeContainer$Builder.mergeFrom(WireFormats.java:300)
    at com.google.protobuf_spark.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:300)
    at com.google.protobuf_spark.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:238)
    at com.google.protobuf_spark.AbstractMessageLite$Builder.mergeFrom(AbstractMessageLite.java:162)
    at com.google.protobuf_spark.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:716)
    at com.google.protobuf_spark.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:238)
    at com.google.protobuf_spark.AbstractMessageLite$Builder.mergeFrom(AbstractMessageLite.java:153)
    at com.google.protobuf_spark.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:709)
    at akka.remote.WireFormats$AckAndEnvelopeContainer.parseFrom(WireFormats.java:234)
    at akka.remote.transport.AkkaPduProtobufCodec$.decodeMessage(AkkaPduCodec.scala:181)
    at akka.remote.EndpointReader.akka$remote$EndpointReader$$tryDecodeMessageAndAck(Endpoint.scala:821)
    at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:755)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/08/10 11:46:45 WARN AbstractNioWorker: Unexpected exception in the selector loop.
java.lang.OutOfMemoryError: Java heap space
    at org.jboss.netty.buffer.HeapChannelBuffer.<init>(HeapChannelBuffer.java:42)
    at org.jboss.netty.buffer.BigEndianHeapChannelBuffer.<init>(BigEndianHeapChannelBuffer.java:34)
    at org.jboss.netty.buffer.ChannelBuffers.buffer(ChannelBuffers.java:134)
    at org.jboss.netty.buffer.HeapChannelBufferFactory.getBuffer(HeapChannelBufferFactory.java:69)
    at org.jboss.netty.buffer.AbstractChannelBufferFactory.getBuffer(AbstractChannelBufferFactory.java:48)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:75)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:472)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:333)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
zero323
  • 322,348
  • 103
  • 959
  • 935
user706838
  • 5,132
  • 14
  • 54
  • 78
  • 1
    how much memory did you allocate for JVM? I suggest reading this http://stackoverflow.com/questions/8331135/how-to-prevent-java-lang-outofmemoryerror-permgen-space-at-scala-compilation . The default 512M might not be enough for you. – GameOfThrows Aug 10 '15 at 09:59
  • re-run it with 5G of memory; `export SPARK_DRIVER_MEMORY="5000M"` I am still facing problem yet the problem is slightly different now. please have a look on my edit. – user706838 Aug 10 '15 at 12:06
  • The problem might be that your data is simply too big to be kept in memory. You can try to calculate the SVM model on a smaller sample size. – Till Rohrmann Aug 10 '15 at 12:38
  • Wierd, the data set does not look that big, yet it is chucking out GC overhead exceeded. Do try to reduce the size of fn12 and try to see if it works. It could also be due to the way you are calling SVMWithSGD – GameOfThrows Aug 10 '15 at 13:07
  • finally it works. instead of using `export SPARK_DRIVER_MEMORY="5000M"`, I increased driver's memory on the spark config level. – user706838 Aug 14 '15 at 10:29

0 Answers0