6

I have an RDD[String], wordRDD. I also have a function that creates an RDD[String] from a string/word. I would like to create a new RDD for each string in wordRDD. Here are my attempts:

1) Failed because Spark does not support nested RDDs:

var newRDD = wordRDD.map( word => {
  // execute myFunction()
  (new MyClass(word)).myFunction()
})

2) Failed (possibly due to scope issue?):

var newRDD = sc.parallelize(new Array[String](0))
val wordArray = wordRDD.collect
for (w <- wordArray){
  newRDD = sc.union(newRDD,(new MyClass(w)).myFunction())
}

My ideal result would look like:

// input RDD (wordRDD)
wordRDD: org.apache.spark.rdd.RDD[String] = ('apple','banana','orange'...)

// myFunction behavior
new MyClass('apple').myFunction(): RDD[String] = ('pple','aple'...'appl')

// after executing myFunction() on each word in wordRDD:
newRDD: RDD[String] = ('pple','aple',...,'anana','bnana','baana',...)

I found a relevant question here: Spark when union a lot of RDD throws stack overflow error, but it didn't address my issue.

Community
  • 1
  • 1
matsuninja
  • 275
  • 3
  • 12

2 Answers2

3

You cannot create a RDD from within another RDD.

However, it is possible to rewrite your function myFunction: String => RDD[String], which generates all words from the input where one letter is removed, into another function modifiedFunction: String => Seq[String] such that it can be used from within an RDD. That way, it will also be executed in parallel on your cluster. Having the modifiedFunction you can obtain the final RDD with all words by simply calling wordRDD.flatMap(modifiedFunction).

The crucial point is to use flatMap (to map and flatten the transformations):

def main(args: Array[String]) {
  val sparkConf = new SparkConf().setAppName("Test").setMaster("local[*]")
  val sc = new SparkContext(sparkConf)

  val input = sc.parallelize(Seq("apple", "ananas", "banana"))

  // RDD("pple", "aple", ..., "nanas", ..., "anana", "bnana", ...)
  val result = input.flatMap(modifiedFunction) 
}

def modifiedFunction(word: String): Seq[String] = {
  word.indices map {
    index => word.substring(0, index) + word.substring(index+1)
  }
}
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
Till Rohrmann
  • 13,148
  • 1
  • 25
  • 51
3

Use flatMap to get RDD[String] as you desire.

var allWords = wordRDD.flatMap { word => 
  (new MyClass(word)).myFunction().collect()
}
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
Climbs_lika_Spyder
  • 6,004
  • 3
  • 39
  • 53
  • 2
    How is this supposed to run in parallel? Everything which happens within `wordRDD.map` is executed on the cluster. Thus, the inner `collect` has to trigger a new Spark job from within a running job. I suspect that it won't run distributedly. – Till Rohrmann Sep 10 '15 at 22:41
  • He could also alter the function to return arrays instead of RDDs, but the question didn't specify the actual function. – Climbs_lika_Spyder Sep 11 '15 at 20:33
  • But his description says that he has a function, I assumes it's `myFunction` that creates a `RDD[String]` from a string/word. – Till Rohrmann Sep 12 '15 at 23:15
  • Yes it does. Your answer tells him to change `myFunction` to return something different. Without knowing how complicated the function is it is hard to say if the calculations done in it are distributed or not. If collecting a data set means that all previous calculations done are no longer distributed, then nothing would be distributed. – Climbs_lika_Spyder Sep 15 '15 at 13:00
  • Have you ever tried running your code? It is basically not possible calling `collect` from within another `RDD`. How do you imagine this should be realized? The `collect` method will trigger the job execution and return the results as a `Seq` to your driver node. But the `flatMap` operation is part of your job which is executed distributedly. How shall the `collect` method then be executed? Moreover, the `SparkContext` which is necessary to generate new `RDDs` is simply not serializable. Thus, it is not possible to ship it with your UDF. – Till Rohrmann Sep 15 '15 at 13:15
  • 1
    I don't know what you mean by ship it, but I have run the code that I originally posted. It worked. @JacekLaskowski made my code more compact, but I assume it still works. – Climbs_lika_Spyder Sep 15 '15 at 13:27