0

Here is the code:

val words = sc.textFile("/Users/kaiyin/IdeaProjects/learnSpark/src/main/resources/eng_words.txt" )
words.take(1000000).foreach(println _)
words.take(150000).groupBy((x: String) => x.head).map {
  case (c, iter)  => (c, iter.toList.size)
}.foreach {
  println _
}

The eng_words.txt is a text file containing about 1 million English words, one per line. Once the RDD goes above 150000, groupBy will crash with this error:

java.util.NoSuchElementException: next on empty iterator
  at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
  at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
  at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
  at scala.collection.IterableLike$class.head(IterableLike.scala:107)
  at scala.collection.immutable.StringOps.scala$collection$IndexedSeqOptimized$$super$head(StringOps.scala:30)
  at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126)
  at scala.collection.immutable.StringOps.head(StringOps.scala:30)
  at $anon$1$$anonfun$run$1.apply(<console>:23)
  at $anon$1$$anonfun$run$1.apply(<console>:23)
  at scala.collection.TraversableLike$$anonfun$groupBy$1.apply(TraversableLike.scala:332)
  at scala.collection.TraversableLike$$anonfun$groupBy$1.apply(TraversableLike.scala:331)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.groupBy(TraversableLike.scala:331)
  at scala.collection.mutable.ArrayOps$ofRef.groupBy(ArrayOps.scala:186)
  at $anon$1.run(<console>:23)
  at Helper.HasRun$class.newRun(HasRun.scala:21)
  at $anon$1.newRun(<console>:19)
  ... 55 elided

What went wrong?

Community
  • 1
  • 1
qed
  • 22,298
  • 21
  • 125
  • 196

1 Answers1

4

In this particular case it most likely cannot handle an empty string. Nevertheless don't groupBy, don't call toList and don't trust blindly that the input is well formated.

  • head will fail on empty line with the error you see

  • groupBy same as groupByKey requires all records for each key to fit into executor memory.

What you have here is yet another word count:

words
  // Make sure that it won't fail on empty string with
  // java.util.NoSuchElementException: next on empty iterator
  .flatMap(_.headOption) 
  // Map to pairs and reduce to avoid excessive shuffling and limit memory usage
  .map((_, 1))
  .reduceByKey(_ + _)
zero323
  • 322,348
  • 103
  • 959
  • 935
  • 1
    Wait, is that right? I thought executors could seamlessly spill what they need to disk if it doesn't have access to enough memory. Granted that comes with a big slowdown, but it should still work. Regardless, `reduceByKey` is better, like you suggested, because it lets the executor perform an aggregation before doing the rest. – Jeff Jun 29 '16 at 18:15
  • 1
    @JeffL. You're partially correct here. By all means data can be spill if needed but not for a single key. Values you get are just a variation about ArrayBuffer and as such have to fit in the memory. – zero323 Jun 29 '16 at 18:27
  • 1
    That's interesting, thanks. It's hard to find good info on what Spark does under the hood sometimes, when you get into the details like this. Also, some of this could be mitigated by the resources you assign to your executors for the cluster, I should think. This seems primarily important because `reduceByKey` doesn't have a dataframe equivalent, if I'm not mistaken. – Jeff Jun 29 '16 at 18:32
  • 2
    @JeffL. It is slightly different. `Data(set|Frame).groupBy` is a logical operation not a physical one. For DF you can check http://stackoverflow.com/q/32902982/1560062. Regarding RDD it is pretty much (in)famous [avoid groupByKey](https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html). – zero323 Jun 29 '16 at 18:35