I have been trying to execute a scala program and the output somehow always seems to be something like this:
15/08/17 14:13:14 ERROR util.Utils: uncaught error in thread SparkListenerBus, stopping SparkContext
java.lang.OutOfMemoryError: Java heap space
at java.lang.AbstractStringBuilder.<init>(AbstractStringBuilder.java:64)
at java.lang.StringBuilder.<init>(StringBuilder.java:97)
at com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:339)
at com.fasterxml.jackson.core.io.SegmentedStringWriter.getAndClear(SegmentedStringWriter.java:83)
at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2344)
at org.json4s.jackson.JsonMethods$class.compact(JsonMethods.scala:32)
at org.json4s.jackson.JsonMethods$.compact(JsonMethods.scala:44)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:143)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:143)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:143)
at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:169)
at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:34)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:56)
at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:79)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1215)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)
or like this
15/08/19 11:45:11 ERROR util.Utils: uncaught error in thread SparkListenerBus, stopping SparkContext
java.lang.OutOfMemoryError: GC overhead limit exceeded
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider$Impl.createInstance(DefaultSerializerProvider.java:526)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider$Impl.createInstance(DefaultSerializerProvider.java:505)
at com.fasterxml.jackson.databind.ObjectMapper._serializerProvider(ObjectMapper.java:2846)
at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902)
at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280)
at com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1255)
at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:22)
at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128)
at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902)
at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280)
at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:17)
at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128)
at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902)
at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280)
at com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1255)
at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:22)
at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128)
at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902)
at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280)
at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:17)
at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128)
at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902)
at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280)
at com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1255)
at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:22)
at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128)
at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:2881)
Are these errors on the driver or executor side?
I am a bit confused with the memory variables that Spark uses. My current settings are
spark-env.sh
export SPARK_WORKER_MEMORY=6G
export SPARK_DRIVER_MEMORY=6G
export SPARK_EXECUTOR_MEMORY=4G
spark-defaults.conf
# spark.driver.memory 6G
# spark.executor.memory 4G
# spark.executor.extraJavaOptions ' -Xms5G -Xmx5G '
# spark.driver.extraJavaOptions ' -Xms5G -Xmx5G '
Do I need to uncomment any of the variables contained in spark-defaults.conf, or are they redundant?
Is for example setting SPARK_WORKER_MEMORY
equivalent to setting the spark.executor.memory
?
Part of my scala code where it stops after a few iterations:
val filteredNodesGroups = connCompGraph.vertices.map{ case(_, array) => array(pagerankIndex) }.distinct.collect
for (id <- filteredNodesGroups){
val clusterGraph = connCompGraph.subgraph(vpred = (_, attr) => attr(pagerankIndex) == id)
val pagerankGraph = clusterGraph.pageRank(0.15)
val completeClusterPagerankGraph = clusterGraph.outerJoinVertices(pagerankGraph.vertices) {
case (uid, attrList, Some(pr)) =>
attrList :+ ("inClusterPagerank:" + pr)
case (uid, attrList, None) =>
attrList :+ ""
}
val sortedClusterNodes = completeClusterPagerankGraph.vertices.toArray.sortBy(_._2(pagerankIndex + 1))
println(sortedClusterNodes(0)._2(1) + " with rank: " + sortedClusterNodes(0)._2(pagerankIndex + 1))
}
Many questions disguised as one. Thank you in advance!