0

I have been trying to execute a scala program and the output somehow always seems to be something like this:

15/08/17 14:13:14 ERROR util.Utils: uncaught error in thread SparkListenerBus, stopping SparkContext
java.lang.OutOfMemoryError: Java heap space
at java.lang.AbstractStringBuilder.<init>(AbstractStringBuilder.java:64)
at java.lang.StringBuilder.<init>(StringBuilder.java:97)
at com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:339)
at com.fasterxml.jackson.core.io.SegmentedStringWriter.getAndClear(SegmentedStringWriter.java:83)
at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2344)
at org.json4s.jackson.JsonMethods$class.compact(JsonMethods.scala:32)
at org.json4s.jackson.JsonMethods$.compact(JsonMethods.scala:44)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:143)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:143)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:143)
at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:169)
at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:34)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:56)
at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:79)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1215)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)

or like this

15/08/19 11:45:11 ERROR util.Utils: uncaught error in thread SparkListenerBus, stopping SparkContext
java.lang.OutOfMemoryError: GC overhead limit exceeded
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider$Impl.createInstance(DefaultSerializerProvider.java:526)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider$Impl.createInstance(DefaultSerializerProvider.java:505)
    at com.fasterxml.jackson.databind.ObjectMapper._serializerProvider(ObjectMapper.java:2846)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902)
    at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280)
    at com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1255)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:22)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902)
    at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:17)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902)
    at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280)
    at com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1255)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:22)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902)
    at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:17)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902)
    at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280)
    at com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1255)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:22)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128)
    at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:2881)

Are these errors on the driver or executor side?

I am a bit confused with the memory variables that Spark uses. My current settings are

spark-env.sh

export SPARK_WORKER_MEMORY=6G
export SPARK_DRIVER_MEMORY=6G
export SPARK_EXECUTOR_MEMORY=4G

spark-defaults.conf

# spark.driver.memory              6G
# spark.executor.memory            4G
# spark.executor.extraJavaOptions  ' -Xms5G -Xmx5G '
# spark.driver.extraJavaOptions   ' -Xms5G -Xmx5G '

Do I need to uncomment any of the variables contained in spark-defaults.conf, or are they redundant?

Is for example setting SPARK_WORKER_MEMORY equivalent to setting the spark.executor.memory?

Part of my scala code where it stops after a few iterations:

   val filteredNodesGroups = connCompGraph.vertices.map{ case(_, array) => array(pagerankIndex) }.distinct.collect
    for (id <- filteredNodesGroups){
        val clusterGraph = connCompGraph.subgraph(vpred = (_, attr) => attr(pagerankIndex) == id)
        val pagerankGraph = clusterGraph.pageRank(0.15)
        val completeClusterPagerankGraph = clusterGraph.outerJoinVertices(pagerankGraph.vertices) {
            case (uid, attrList, Some(pr)) => 
                attrList :+ ("inClusterPagerank:" + pr)
            case (uid, attrList, None) => 
                attrList :+ ""
        }
        val sortedClusterNodes = completeClusterPagerankGraph.vertices.toArray.sortBy(_._2(pagerankIndex + 1))
       println(sortedClusterNodes(0)._2(1) + " with rank: " + sortedClusterNodes(0)._2(pagerankIndex + 1))

     }        

Many questions disguised as one. Thank you in advance!

sofia
  • 440
  • 7
  • 20
  • It really depends of what you are trying to achieve, could you provide some code to illustrate ? – Francis Toth Aug 17 '15 at 13:13
  • Just to add that the master often dies after the failed execution. – sofia Aug 17 '15 at 13:17
  • I updated the original post to include the code – sofia Aug 17 '15 at 13:36
  • In most cases, it is probable to optimize your code to boost performance and avoid memory issues with JVM. Could you roughly specify the size of your data, and why you are using that for loop? – GameOfThrows Aug 17 '15 at 14:16
  • I can't see how I can optimise this more. What I am trying to do is break a graph into smaller graphs and calculate the page rank for each of the subgraphs. – sofia Aug 17 '15 at 14:28
  • Any answer as to what this error actually means (driver or executor error?) would be appreciated, as I will know which memory to try to increase. I get no other errors in the master, worker and executor logs. – sofia Aug 19 '15 at 09:26

1 Answers1

0

I'm not a Spark expert, but there is line that seems suspicious to me :

val filteredNodesGroups = connCompGraph.vertices.map{ case(_, array) => array(pagerankIndex) }.distinct.collect

Basically, by using the collect method, you are getting back all the data from your executors (before even processing it) to the driver. Do you have any idea about the size of this data ?

In order to fix this, you should proceed in a more functional way. To extract the distinct values, you could for example use a groupBy and map :

val pairs = connCompGraph.vertices.map{ case(_, array) => array(pagerankIndex) }
pairs.groupBy(_./* the property to group on */)
     .map { case (_, arrays) => /* map function */ }

Regarding the collect, there should be a way to sort each partition and then to return the (processed) result to the driver. I would like to help you more but I need more information about what you are trying to do.

UPDATE

After digging a little bit, you could sort your data using shuffling as described here

UPDATE

So far, I've tried to avoid the collect, and to get the data back to the driver as much as possible, but I've no idea how to solve this :

val filteredNodesGroups = connCompGraph.vertices.map{ case(_, array) => array(pagerankIndex) }.distinct()
val clusterGraphs = filteredNodesGroups.map { id => connCompGraph.subgraph(vpred = (_, attr) => attr(pagerankIndex) == id) }
val pageRankGraphs = clusterGraphs.map(_.pageRank(0.15))

Basically, you need to join two RDD[Graph[Array[String], String]], but I don't know what key to use, and secondly this would necessarily return an RDD of RDD (I don't know if you can even do that). I'll try to find something later this day.

Community
  • 1
  • 1
Francis Toth
  • 1,595
  • 1
  • 11
  • 23
  • I do indeed collect the groups in the beginning. `filteredNodesGroups` contains integers that represent a node each, therefore the iteration (which happens locally) computes a subgraph and then the pagerank on each subgraph. – sofia Aug 18 '15 at 14:54
  • The idea is simple: I need to break the graph to smaller graphs (according to one of the attributes that each node contains, it goes to a particular subgraph). Then, I need to calculate the pagerank on each of the subgraphs and print the node with the biggest pagerank. Most of the subgraphs are 2-20 nodes big and there is one subgraph that contains about 13.000 nodes and a few thousand edges. However, the calculation stops usually before that subgraph is reached. I can therefore not justify all this memory expense. Any ideas? – sofia Aug 18 '15 at 15:19
  • This is the complete code, inside a `def clusterElements(connCompGraph: Graph[Array[String],String], pagerankIndex: Int) = {...the code above...}` Indeed, connCompGraph is a Graph. – sofia Aug 20 '15 at 12:16