7

Is there any Spark function that allows to split a collection into several RDDs according to some creteria? Such function would allow to avoid excessive itteration. For example:

def main(args: Array[String]) {
    val logFile = "file.txt" 
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val lineAs = logData.filter(line => line.contains("a")).saveAsTextFile("linesA.txt")
    val lineBs = logData.filter(line => line.contains("b")).saveAsTextFile("linesB.txt")
  }

In this example I have to iterate 'logData` twice just to write results in two separate files:

    val lineAs = logData.filter(line => line.contains("a")).saveAsTextFile("linesA.txt")
    val lineBs = logData.filter(line => line.contains("b")).saveAsTextFile("linesB.txt")

It would be nice instead to have something like this:

    val resultMap = logData.map(line => if line.contains("a") ("a", line) else if line.contains("b") ("b", line) else (" - ", line)
    resultMap.writeByKey("a", "linesA.txt") 
    resultMap.writeByKey("b", "linesB.txt")

Any such thing?

zork
  • 2,085
  • 6
  • 32
  • 48

2 Answers2

5

Maybe something like this would work:

def singlePassMultiFilter[T](
      rdd: RDD[T],
      f1: T => Boolean,
      f2: T => Boolean,
      level: StorageLevel = StorageLevel.MEMORY_ONLY
  ): (RDD[T], RDD[T], Boolean => Unit) = {
  val tempRDD = rdd mapPartitions { iter =>
    val abuf1 = ArrayBuffer.empty[T]
    val abuf2 = ArrayBuffer.empty[T]
    for (x <- iter) {
      if (f1(x)) abuf1 += x
      if (f2(x)) abuf2 += x
    }
    Iterator.single((abuf1, abuf2))
  }
  tempRDD.persist(level)
  val rdd1 = tempRDD.flatMap(_._1)
  val rdd2 = tempRDD.flatMap(_._2)
  (rdd1, rdd2, (blocking: Boolean) => tempRDD.unpersist(blocking))
}

Note that an action called on rdd1 (resp. rdd2) will cause tempRDD to be computed and persisted. This is practically equivalent to computing rdd2 (resp. rdd1) since the overhead of the flatMap in the definitions of rdd1 and rdd2 are, I believe, going to be pretty negligible.

You would use singlePassMultiFitler like so:

val (rdd1, rdd2, cleanUp) = singlePassMultiFilter(rdd, f1, f2)
rdd1.persist()    //I'm going to need `rdd1` more later...
println(rdd1.count)  
println(rdd2.count) 
cleanUp(true)     //I'm done with `rdd2` and `rdd1` has been persisted so free stuff up...
println(rdd1.distinct.count)

Clearly this could extended to an arbitrary number of filters, collections of filters, etc.

Jason Scott Lenderman
  • 1,908
  • 11
  • 14
  • It can be more efficient at a very low level (CPU cache utilization and stuff like that although it can be eaten by ArrayBuffer maintenance) but at the higher level it performs exactly the same amount of work as repeated filter on a cached `rdd`. Still it looks much better than an accepted answer which actually makes situation worse. – zero323 Oct 10 '15 at 13:53
  • The approach I suggested only requires iterating through `rdd` once, while repeatedly filtering on `rdd` will necessitate iterating through `rdd` multiple times. Also, if one knows that the filtering criteria are mutually exclusive (probably true for many uses), then greater efficiency can be obtained by a simply replacing the two `if` statements with a single `if-else` statement. Such an optimization would not be possible by repeatedly filtering on `rdd`. – Jason Scott Lenderman Oct 10 '15 at 17:22
  • One way or another, for N elements and M conditions, it is still O(NM). If RDD wasn't cached in memory there would be a huge practical difference but otherwise it doesn't really matter. Anyway, if still I think it is much better approach than partitioning hence the upvote. – zero323 Oct 10 '15 at 17:30
  • Yes, but there is a cost to just doing the iteration. At the level of machine instructions this involves increasing a counter and comparing against 0 (or, even worse, loading a register, and comparing against that register) for each element in the collection that you're iterating over. If you can do all the work in a single iteration over the collection then you save some computation. It's not a huge savings, but it is noticeable, especially if computations involved in the filtering criteria are not expensive. – Jason Scott Lenderman Oct 10 '15 at 17:39
  • Yes, although using `ArrayBuffers` is not cheap either. Moreover with non-exclusive conditions you need more memory to cache `tempRDD ` than `rdd`. Finally it requires additional transformation and it is quite expensive even if there is almost nothing to do. If I have some spare I'll try to perform some tests but I am not very optimistic. – zero323 Oct 10 '15 at 17:47
  • Assuming a decent implementation `ArrayBuffers` shouldn't be too bad. I believe adding elements has constant amortized cost, but maybe there's a better way of dealing with that. As for your concern about additional memory usage, if you're not going to cache the resulting `RDDs` then you wouldn't want to use this approach in the first place. Finally, the final transformation, i.e. the `flatMaps` are just "sewing" together a bunch of `Iterators` (one for each split of the data), so that cost of that should pretty negligible compared to iterating through the data multiple times. – Jason Scott Lenderman Oct 10 '15 at 17:54
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/91928/discussion-between-zero323-and-jason-lenderman). – zero323 Oct 10 '15 at 17:55
3

Have a look at the following question.

Write to multiple outputs by key Spark - one Spark job

You can flatMap an RDD with a function like the following and then do a groupBy on the key.

def multiFilter(words:List[String], line:String) = for { word <- words; if line.contains(word) } yield { (word,line) }
val filterWords = List("a","b")
val filteredRDD = logData.flatMap( line => multiFilter(filterWords, line) ) 
val groupedRDD = filteredRDD.groupBy(_._1) 

But depending on the size of your input RDD you may or not see any performance gains because any of groupBy operations involves a shuffle.

On the other hand if you have enough memory in your Spark cluster you can cache the input RDD and therefore running multiple filter operations may not be as expensive as you think.

Community
  • 1
  • 1
Soumya Simanta
  • 11,523
  • 24
  • 106
  • 161
  • Thanks. Yet when called on a dataset of (K, V) pairs, `groupBy` returns a **single RDD** contating dataset of (K, Iterable) pairs. So there is no way to get a **collection of RDDs** as a result of Spark transformations? – zork Dec 02 '14 at 10:55