-1

Sorry for the confusion in the initial question. Here is a questions with the reproducible example:

I have an rdd of [String] and I have a rdd of [String, Long]. I would like to have an rdd of [Long] based on the match of String of second with String of first. Example:

//Create RDD
val textFile = sc.parallelize(Array("Spark can also be used for compute intensive tasks",
      "This code estimates pi by throwing darts at a circle"))
// tokenize, result: RDD[(String)]
val words = textFile.flatMap(line => line.split(" "))
// create index of distinct words, result:  RDD[(String,Long)]
val indexWords = words.distinct().zipWithIndex()

As a result, I would like to have an RDD with indexes of words instead of words in "Spark can also be used for compute intensive tasks".

Sorry again and thanks

aigujin
  • 29
  • 1
  • 6
  • I suggest you have a look at the map-function. – Glennie Helles Sindholt Apr 12 '16 at 09:09
  • Since you can have many arrays inside each RDD then which pair of array will you use to get the result? or based on index? – iboss Apr 12 '16 at 09:35
  • @iboss The resulted rdd will be `values` of `y` based on the matching `keys` with `x`. – aigujin Apr 12 '16 at 10:01
  • I think you're confusing us all - should the Arrays in the example code actually be RDDs? In other words, is the first RDD of type `RDD[Array[String]]` or `RDD[String]`? If it's the latter and you're just "replacing" RDDs with Arrays for the example - please don't. If it's the former - please create a full example with RDDs with the expected results. – Tzach Zohar Apr 12 '16 at 10:37
  • "paired rdd of `Array[String, Long]`" - still does not make sense. Is it an `RDD[(String, Long)]` (which can indeed be used as a PairRDD) or an `RDD[Array[(String, Long)]]`? If it's the latter, give a full example. If it's the former - why replace RDDs with Arrays in the example? – Tzach Zohar Apr 12 '16 at 10:52

2 Answers2

0

If I understand you correctly, you're interested in the indices of works that also appear in Spark can also be used for compute intensive tasks.

If so - here are two versions with identical outputs but different performance characteristics:

val lookupWords: Seq[String] = "Spark can also be used for compute intensive tasks".split(" ")

// option 1 - use join:
val lookupWordsRdd: RDD[(String, String)] = sc.parallelize(lookupWords).keyBy(w => w)
val result1: RDD[Long] = indexWords.join(lookupWordsRdd).map { case (key, (index, _)) => index }

// option 2 - assuming list of lookup words is short, you can use a non-distributed version of it
val result2: RDD[Long] = indexWords.collect { case (key, index) if lookupWords.contains(key) => index }

The first option creates a second RDD with the words whose indices we're interested in, uses keyBy to transform it into a PairRDD (with key == value!), joins it with your indexWords RDD and then maps to get the index only.

The second option should only be used if the list of "interesting words" is known not to be too large - so we can keep it as a list (and not RDD), and let Spark serialize it and send to workers for each task to use. We then use collect(f: PartialFunction[T, U]) which applies this partial function to get a "filter" and a "map" at once - we only return a value if the words exists in the list, and if so - we return the index.

Tzach Zohar
  • 37,442
  • 3
  • 79
  • 85
  • Thank you for the answer. I applied option 1 to my data and got an rdd of this type: `org.apache.spark.rdd.RDD[org.apache.spark.rdd.RDD[Long]]`. My dataset is an `RDD[String]` and consist of sentences so I use `map` to pass your code to each of the sentence. To make `join` I use `sc.parallelize(x)` inside of `map`. At the end, I got a nested RDDs and I don't know how to get to a `String` type. – aigujin Apr 13 '16 at 11:11
  • That won't work, you _cannot_ create `RDD[RDD[..]]`, and you cannot use a `SparkContext` inside an RDD transformation (e.g. `map`), don't even try that. I don't really get what you're trying to do (should each distinct word be mapped to its index in every sentence? What's the desired result type?) so I can't suggest an alternative, but usually whenever you think you need "nested RDDs" you actually should use `RDD.join` or `RDD.cartesian`. – Tzach Zohar Apr 13 '16 at 11:23
  • Your right: I would like each word in a sentence to be mapped to its index in each sentence. The collection of indices is already done and you showed how to do the mapping in a single sentence. Now, I would like to apply it to an rdd via `map` transformation. The desired output is a `rdd[string]` with integers (instead of words) for each sentence. – aigujin Apr 13 '16 at 11:46
  • Hi, could you update your answer to apply your solution to variable `textFile` from my example (i.e., on both initial sentences). – aigujin Apr 14 '16 at 07:40
0

I was getting an error of SPARK-5063 and given this answer, I found the solution to my problem:

//broadcast `indexWords`
val bcIndexWords = sc.broadcast(indexWords.collectAsMap)
// select `value` of `indexWords` given `key`
val result = textFile.map{arr => arr.split(" ").map(elem => bcIndexWords.value(elem))}
result.first()
res373: Array[Long] = Array(3, 7, 14, 6, 17, 15, 0, 12)
Community
  • 1
  • 1
aigujin
  • 29
  • 1
  • 6