14

I am attempting to perform a simple transformation of common crawl data using Spark host on an EC2 using this guide, my code looks like this:

package ccminer

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object ccminer {
  val english = "english|en|eng"
  val spanish = "es|esp|spa|spanish|espanol"
  val turkish = "turkish|tr|tur|turc"
  val greek = "greek|el|ell"
  val italian = "italian|it|ita|italien"
  val all = (english :: spanish :: turkish :: greek :: italian :: Nil).mkString("|")

  def langIndep(s: String) = s.toLowerCase().replaceAll(all, "*")

  def main(args: Array[String]): Unit = {
    if (args.length != 3) {
      System.err.println("Bad command line")
      System.exit(-1)
    }

    val cluster = "spark://???"
    val sc = new SparkContext(cluster, "Common Crawl Miner",
      System.getenv("SPARK_HOME"), Seq("/root/spark/ccminer/target/scala-2.10/cc-miner_2.10-1.0.jar"))

    sc.sequenceFile[String, String](args(0)).map {
      case (k, v) => (langIndep(k), v)
    }
    .groupByKey(args(2).toInt)
    .filter {
      case (_, vs) => vs.size > 1
    }
    .saveAsTextFile(args(1))
  }
}

And I am running it with the command as follows:

sbt/sbt "run-main ccminer.ccminer s3n://aws-publicdatasets/common-crawl/parse-output/segment/1341690165636/textData-* s3n://parallelcorpus/out/ 2000"

But very quickly it fails with errors as follows

java.lang.OutOfMemoryError: Java heap space
at com.ning.compress.BufferRecycler.allocEncodingBuffer(BufferRecycler.java:59)
at com.ning.compress.lzf.ChunkEncoder.<init>(ChunkEncoder.java:93)
at com.ning.compress.lzf.impl.UnsafeChunkEncoder.<init>(UnsafeChunkEncoder.java:40)
at com.ning.compress.lzf.impl.UnsafeChunkEncoderLE.<init>(UnsafeChunkEncoderLE.java:13)
at com.ning.compress.lzf.impl.UnsafeChunkEncoders.createEncoder(UnsafeChunkEncoders.java:31)
at com.ning.compress.lzf.util.ChunkEncoderFactory.optimalInstance(ChunkEncoderFactory.java:44)
at com.ning.compress.lzf.LZFOutputStream.<init>(LZFOutputStream.java:61)
at org.apache.spark.io.LZFCompressionCodec.compressedOutputStream(CompressionCodec.scala:60)
at org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:803)
at org.apache.spark.storage.BlockManager$$anonfun$5.apply(BlockManager.scala:471)
at org.apache.spark.storage.BlockManager$$anonfun$5.apply(BlockManager.scala:471)
at org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:117)
at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:174)
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164)
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

So my basic question is, what is necessary to write a Spark task that can group by key with an almost unlimited amount of input without running out of memory?

samthebest
  • 30,803
  • 25
  • 102
  • 142
John McCrae
  • 487
  • 2
  • 5
  • 12

2 Answers2

16

The most common cause of java.lang.OutOfMemoryError exceptions in shuffle tasks (such as groupByKey, reduceByKey, etc.) is low level of parallelism.

You can increase default value by setting spark.default.parallelism property in configuration.

homutov
  • 651
  • 4
  • 7
  • 4
    Also coalescing to too few partitions can cause this. – jbrown Jun 18 '15 at 07:04
  • @jbrown [has a point](http://stackoverflow.com/questions/38961251/java-lang-outofmemoryerror-unable-to-acquire-100-bytes-of-memory-got-0)! – gsamaras Aug 15 '16 at 19:35
  • Notice that [reduceByKey() is indeed appearing to suffer from the same errors](http://stackoverflow.com/questions/29156275/spark-scalability-what-am-i-doing-wrong). – gsamaras Sep 15 '16 at 22:58
6

So this says that you have run out of allocated heap space of JVM. You may increase heap size but still this is limited by system capabilities (Cannot exceed the amount of physical RAM).

On the other hand as explained by homutov this happens in large collecting operations. For example groupByKey, reduceByKey, cartisien + mapToPair . These operations will collect the RDD data into one place making JVM to run out of heap space.

What can you do?

With my experience, when a cluster/system have limited resources, you can use use Spark tuning guide. spark.default.parallelism can be increased till you can accompany task into your cluster/system [I once ran a KNN implementation for 14000 instance, 1024 feature dataset on my laptop's virtual machine by tweaking parallelism ].

Command line flag :   --conf spark.default.parallelism=4   ; 4 is the parallelism value

Remember, you need to TUNE these features to most effective and fail avoidance (running out of heap) setting to get best results out of Spark.

Additionally

Remember to use use primitive datatypes instead of wrappers . And use Arrays instead of collections.

 ex :  List<Integers> vs int[] ; int[] is better than List 

In Spark arrays can save many valuable space and improve performance.

Also use BroadCast variables instead of Cartesian product or any large combination task.

Kavindu Dodanduwa
  • 12,193
  • 3
  • 33
  • 46
  • 1
    good introduction about when, how and why to broadcast can be found https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-broadcast.html – Boern Feb 13 '17 at 08:39
  • 1
    Typo: spark.default.parallelism not spark.defualt.parallelism – Remy Mar 06 '19 at 10:23