2

I have a spark job that (runs in spark 1.3.1) has to iterate over several keys (about 42) and process the job. Here is the structure of the program

  1. Get the key from a map
  2. Fetch data from hive (hadoop-yarn underneath) that is matching the key as a data frame
  3. Process data
  4. Write results to hive

When I run this for one key, everything works fine. When I run with 42 keys, I am getting an out of memory exception around 12th iteration. Is there a way I can clean the memory in between each iteration? Help appreciated.

Here is the high level code that I am working with.

public abstract class SparkRunnable {

public static SparkContext sc = null;
public static JavaSparkContext jsc = null;
public static HiveContext hiveContext = null;
public static SQLContext sqlContext = null;

protected SparkRunnableModel(String appName){
    //get the system properties to setup the model
    // Getting a java spark context object by using the constants
    SparkConf conf = new SparkConf().setAppName(appName);
    sc = new SparkContext(conf);
    jsc = new JavaSparkContext(sc);

    // Creating a hive context object connection by using java spark
    hiveContext = new org.apache.spark.sql.hive.HiveContext(sc);

    // sql context
    sqlContext = new SQLContext(sc);

}

public abstract void processModel(Properties properties) throws Exception;

}

class ModelRunnerMain(model: String) extends SparkRunnableModel(model: String) with Serializable {

  override def processModel(properties: Properties) = {
  val dataLoader = DataLoader.getDataLoader(properties)

//loads keys data frame from a keys table in hive and converts that to a list
val keysList = dataLoader.loadSeriesData()

for (key <- keysList) {
    runModelForKey(key, dataLoader)
}
}

  def runModelForKey(key: String, dataLoader: DataLoader) = {

//loads data frame from a table(~50 col X 800 rows) using "select * from table where key='<key>'"
val keyDataFrame = dataLoader.loadKeyData()

// filter this data frame into two data frames
...

// join them to transpose
...

// convert the data frame into an RDD
...

// run map on the RDD to add bunch of new columns
...
  }

}

My data frame size is under a Meg. But I create several data frames from this by selecting and joining etc. I assume all these get garbage collected once the iteration is done.

Here is configuration I am running with.

  • spark.eventLog.enabled:true spark.broadcast.port:7086
  • spark.driver.memory:12g spark.shuffle.spill:false
  • spark.serializer:org.apache.spark.serializer.KryoSerializer
  • spark.storage.memoryFraction:0.7 spark.executor.cores:8
  • spark.io.compression.codec:lzf spark.shuffle.consolidateFiles:true
  • spark.shuffle.service.enabled:true spark.master:yarn-client
  • spark.executor.instances:8 spark.shuffle.service.port:7337
  • spark.rdd.compress:true spark.executor.memory:48g
  • spark.executor.id: spark.sql.shuffle.partitions:700
  • spark.cores.max:56

Here is the exception I am getting.

Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.util.io.ByteArrayChunkOutputStream.allocateNewChunkIfNeeded(ByteArrayChunkOutputStream.scala:66)
at org.apache.spark.util.io.ByteArrayChunkOutputStream.write(ByteArrayChunkOutputStream.scala:55)
at com.ning.compress.lzf.ChunkEncoder.encodeAndWriteChunk(ChunkEncoder.java:264)
at com.ning.compress.lzf.LZFOutputStream.writeCompressedBlock(LZFOutputStream.java:266)
at com.ning.compress.lzf.LZFOutputStream.write(LZFOutputStream.java:124)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:124)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:84)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:839)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1042)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15$$anonfun$apply$1.apply(DAGScheduler.scala:1039)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15$$anonfun$apply$1.apply(DAGScheduler.scala:1039)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15.apply(DAGScheduler.scala:1039)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15.apply(DAGScheduler.scala:1038)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1038)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1390)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
sbh
  • 49
  • 6
  • this would be hard to answer without seeing some code that reproduces the issue. Generally speaking Spark should be able to let GC collect data that isn't needed anymore, but the devil is in the details... – Tzach Zohar Oct 31 '16 at 16:43
  • I totally agree with @TzachZohar thus I'm voting to close it as being broad without a minimum verifiable complete example. – eliasah Oct 31 '16 at 16:59
  • Thank you both. I will add the code. The problem is that the stack is so generic, I don't know which part I should give. I will extract the important parts and add it to my question. – sbh Oct 31 '16 at 17:11
  • I have updated the post with code. Please take a look. – sbh Oct 31 '16 at 17:58
  • Could this be the static hiveContext that has references to all the objects? I have a java class that extends this SparkRunnable model and does similar steps. That seems to be working fine. OOM happens with scala class. I must be doing something wrong there. – sbh Oct 31 '16 at 19:44

1 Answers1

0

Using checkpoint() or localCheckpoint() can cut the spark lineage and improve the performance of the application in iterations.