8

RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

As the error says, i'm trying to map(transformation) a JavaRDD object within the main map function, how is it possible with Apache Spark?

The main JavaPairRDD object (TextFile and Word are defined classes):

JavaPairRDD<TextFile, JavaRDD<Word>> filesWithWords = new...

and map function:

filesWithWords.map(textFileJavaRDDTuple2 -> textFileJavaRDDTuple2._2().map(word -> new Word(word.getText(), (long) textFileJavaRDDTuple2._1().getText().split(word.getText()).length)));

also i tried foreach instead map function, but not working. (And of course searched SPARK-5063)

Alper M.
  • 143
  • 3
  • 8
  • see also: http://stackoverflow.com/questions/29815878/how-to-deal-with-error-spark-5063-in-spark – maasg May 02 '15 at 09:01

3 Answers3

9

In the same way nested operations on RDDs are not supported, nested RDD types are not possible in Spark. RDDs are only defined at the driver where, in combination with their SparkContext they can schedule operations on the data they represent.

So, the root cause we need to address in this case is the datatype:

JavaPairRDD<TextFile, JavaRDD<Word>> filesWithWords

Which in Spark will have no possible valid use. Depending on the usecase, which is not further explained in the question, this type should become one of:

A collection of RDDs, with the text file they refer to:

Map<TextFile,RDD<Word>>

Or a collection of (textFile,Word) by text file:

JavaPairRDD<TextFile, Word>

Or a collection of words with their corresponding TextFile:

JavaPairRDD<TextFile, List<Word>>

Once the type is corrected, the issues with the nested RDD operations will be naturally solved.

maasg
  • 37,100
  • 11
  • 88
  • 115
  • thank you very much for your answer. I tried to use Map like this: Map> textMap = filesWithWords.collectAsMap(); textMap.forEach((textFile, wordJavaRDD) -> wordJavaRDD.map(word -> /* some transformation*/ )); but again returns same error. – Alper M. May 02 '15 at 10:10
  • @Alp `collectAsMap()` will not give you back a `Map> `. What are you trying to do btw? To me, it looks like the construction you're attempting is rather contrived. – maasg May 02 '15 at 10:22
  • `collectAsMap()` gives map (`java.util.Map`), i've checked it. I think you mean `scala.collection.Map`? _TextFile_ class has _Path_ and _Text_ properties of any file, _Word_ class has _word_ and its _count_ And i'm trying to count each word in current text file. Also `JavaRDD` has all words that used in all text files not only current file, so i can't use simple wordCount examples – Alper M. May 02 '15 at 10:43
  • @Alp that is an extension of word count, counting (file, word) pairs instead. – maasg May 02 '15 at 11:29
  • Why not? Maybe you could post another question with the code you are trying. That approach should work, so it's probably some issue related to your code. – maasg May 02 '15 at 16:18
3

When I got to this exact same point in my learning curve for Spark (tried and failed to use nested RDDs) I switched to DataFrames and was able to accomplish the same thing using joins instead. Also, in general, DataFrames appear to be almost twice as fast as RDDs -- at least for the work I have been doing.

David Griffin
  • 13,677
  • 5
  • 47
  • 65
0

@maasg Firstly i used JavaPairRDD< TextFile, JavaRDD < Word > >, and it didn't work as you and @David Griffin said, it's not possible yet. Models:

TextFile(String path, String text)

Word(String word, Integer count)

Now using JavaRDD < TextFile > and models have changed as:

TextFile(String path, String text, List< Word > wordList)

Word(String word, Integer count)

Finally,

List<Word> countDrafts = wordCount.map(v11 -> new Word(v11._1(), (long) 0)).collect();
JavaRDD<TextFile> ft = fileTexts.map(v11 -> new TextFile(v11._1(), v11._2(), countDrafts));
ft.foreach(textFile -> textFile.getWordList().forEach(word -> new  Word(word.getText(), getWordCountFromText(textFile.getText(), word.getText())))); 

getWordCountFromText() function counts word in text of TextFile object, but not using spark reduce method unfortunately, using classic way.

By the way, i will try DataFrames in next days, but i have short time to do this.

Thank you all.

Alper M.
  • 143
  • 3
  • 8
  • Instead of writing an answer to respond maasg's and David's answers, you might want to edit your post with the information above and delete this answer. – Mikel Urkia May 04 '15 at 12:23
  • @MikelUrkia http://stackoverflow.com/questions/29996427/how-to-solve-spark-5063-in-nested-map-functions/30029550?noredirect=1#comment48127506_30000494 – Alper M. May 06 '15 at 07:17
  • What @maasg says is that you might want to write a new **question** - not an answer to your existing question - with the code you are trying, mainly because the *new* problem you have identified on his answer is not the same as the one posted in this question. That way it would be easier to give an answer to your *new* question. – Mikel Urkia May 06 '15 at 12:31
  • 1
    it is not big deal or should not be. just a clear answer to my question by me for devs who face with same problem – Alper M. May 08 '15 at 12:05
  • Maybe you are right, @Alp. You give an answer to your question, that is true. Just trying to help you out with your *new problem*. Cheers! – Mikel Urkia May 08 '15 at 13:21