8

So I'm running into an issue where a filter I'm using on an RDD can potentially create an empty RDD. I feel that doing a count() in order to test for emptiness would be very expensive, and was wondering if there is a more performant way to handle this situation.

Here is an example of what this issue might look like:

    val b:RDD[String] = sc.parallelize(Seq("a","ab","abc"))


    println(b.filter(a => !a.contains("a")).reduce(_+_))

would give the result

empty collection
java.lang.UnsupportedOperationException: empty collection
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$36.apply(RDD.scala:1005)
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$36.apply(RDD.scala:1005)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1005)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
    at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)

Does anyone have any suggestions for how I should go about addressing this edge case?

Daniel Imberman
  • 618
  • 1
  • 5
  • 18

2 Answers2

13

Consider .fold("")(_ + _) instead of .reduce(_ + _)

Dima
  • 39,570
  • 6
  • 44
  • 70
  • Hi Dima. This is very interesting. Could you please explain? It would seem from this answer that foldleft might be a far more expensive function http://stackoverflow.com/a/25158790/2159333. Is it that the increased cost of running a foldleft would be less than the extra steps involved in an isempty or a count? – Daniel Imberman Dec 17 '15 at 16:59
  • @DanielImberman The answer you referenced is about foldLeft, not fold. `fold` does not have the extra cost it is talking about, foldLeft does, although, I do think, it would indeed be significantly lower than that of counting the number of elements in the rdd. – Dima Dec 18 '15 at 02:18
1

how about

scala> val b = sc.parallelize(Seq("a","ab","abc"))
b: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at     parallelize at <console>:24

scala> b.isEmpty
res1: Boolean = false
Roberto Congiu
  • 5,123
  • 1
  • 27
  • 37
  • `isEmpty` does a `take(1)`, so it can also be expensive. – Daniel Darabos Dec 11 '15 at 11:07
  • 1
    not nearly as expensive as a count(), filter(), reduce() or fold() would be since they all will do a full scan.. – Roberto Congiu Dec 11 '15 at 18:08
  • In my experience it is typical that the run time is dominated by shuffles. `take(1)` will trigger shuffles just as much as `count`. On the other hand the shuffle results will be re-used between `isEmpty` and `reduce` so maybe it's not so terrible. But `fold` is definitely the way to go! – Daniel Darabos Dec 11 '15 at 23:06
  • I don't think that either count or take(1) will cause a shuffle. `fold` will still be faster than `count` (which is simply an additional wasted reduce), and approximately as fast as `isEmpty`. My preference of `fold` vs. `isEmpty` is more of aesthetic nature, as it makes the code more linear, idiomatic and readable. I don't think there is any significant difference from the performance standpoint. – Dima Dec 18 '15 at 02:46