1

I have a custom class E which has, among others, a field word. I have a large es: RDD[E] with several 100000 elements and a doc: Seq[String] with typically a few hundred entries. In es, every element's word field value is unique.

My task is to look up the element in es for each of the strings in doc. It is however not guaranteed that such an element exists. My naive Scala/Spark implementation is thus:

def word2E(words: Seq[String]): Seq[E] = {
  words.map(lookupWord(_, es))
    .filter(_.isDefined)
    .map(_.get)
}

The method lookupWord() is defined as follows:

def lookupWord(w: String, es: RDD[E]): Option[E] = {
  val lookup = es.filter(_.word.equals(w))

  if (lookup.isEmpty) None
  else Some(lookup.first)
}

When I look at the Spark stages overview, it seems like lookupWord() is a bottleneck. In particular, the isEmpty() calls in lookupWord take relatively long (up to 2s) in some cases.

I have already persisted the es RDD. Is there any other leverage for optimizing such a task or is this just as good as it gets when operating on such a dataset?

I have noticed the lookup() method in PairRDDFunctions and considered to construct a PairRDD in which the word field would serve as the key. Might that help? Drawing any conclusions experimentally here is quite hard because there are so many factors involved.

Carsten
  • 1,912
  • 1
  • 28
  • 55
  • you could try an indexed RDD - refer to accepted answer here : http://stackoverflow.com/questions/24724786/distributed-map-in-scala-spark – blue-sky Aug 26 '15 at 09:04
  • IndexedRDD do look like addressing my exact task. However, they are not yet available in the most recent Spark release (1.4.1 as of writing). – Carsten Aug 26 '15 at 09:17

1 Answers1

1

The problem with your implementation is that you trigger for each word in words a complete traversal of your RDD and then collect the elements. One way to solve your problem is to join the sequence of words with your RDD:

case class E(word: String, value: Int)

object App {

  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("Test").setMaster("local[4]")
    val sc = new SparkContext(sparkConf)

    val entries = sc.parallelize(List(E("a", 1), E("b", 2), E("c", 3), E("c", 3)))

    val words = Seq("a", "a", "c")

    val wordsRDD = sc.parallelize(words).map(x => (x, x))

    val matchingEntries = entries
      .map(x => (x.word, x))
      .join(wordsRDD)
      .map{
        case (_, (entry, _)) => entry
      }
      .collect

    println(matchingEntries.mkString("\n"))
  }
}

The output is

E(a,1)
E(a,1)
E(c,3)
E(c,3)
Till Rohrmann
  • 13,148
  • 1
  • 25
  • 51
  • This looks promising, although I need to have a mapping between input strings and `E` elements; not a straight mapping though, but if two equals strings are contained in `doc`, the matching `E` should occur twice in the result sequence; this might be doable by emulating a multimap, I suppose. – Carsten Aug 26 '15 at 09:23
  • @Carsten, did I understand you correctly, that you're actually looking for a join operation? Thus, if `words=Seq("a", "a", "c")` you wanna have the output `Seq(E("a", 1), E("a", 1), E("c", 3), E("c", 3))`? – Till Rohrmann Aug 26 '15 at 09:29
  • @Carsten, I've updated my answer with a join solution. – Till Rohrmann Aug 26 '15 at 10:15
  • That answer indeed did the trick, the performance is now definitely significantly faster. I also tried the solution I briefly sketched initially, creating a `PairRDD` and use its `lookup()` method: also significantly faster, but much slower than using `join()` (although making objective comparisons remains quite hard). – Carsten Aug 26 '15 at 12:05
  • You could also try to use a broadcast join since the `words` sequence is quite small. See here http://stackoverflow.com/a/17690254/4815083 – Till Rohrmann Aug 26 '15 at 12:19
  • One more question: in the line `val wordsRDD = sc.parallelize(words).map(x => (x, x))`, why would you set the 2nd element to `x`? I understand that it is discarded after the join operation. Couldn't you save memory and write `map(x => (x, null))` instead? – Carsten Aug 26 '15 at 12:39
  • @Carsten, you're right. It's not necessary and setting the value to `null` should save some space :-) – Till Rohrmann Aug 26 '15 at 13:50