0

I'm having an issue with a function(which I can't modify) needing an RDD as input, but my data is in such a format that I can't seem to get just an RDD into the function.

Consider an RDD that was created by a groupby such that it consists of ("name", data) pairs, called coolRdd. The data is an Iterable[String], and the name is a String. However, I need to run CoolFunction on it, which takes type (Rdd[String], String). Here was my attempt:

coolRdd.foreach{ case (name, data) => sc.CoolFunction(data.toList, name) }

which returns

found   : List[String]
required: org.apache.spark.rdd.RDD[String]

I also tried running sc.parallelize on the data.toList, but that gives a nullPointer because it would create an RDD of RDDs which Spark doesn't allow.

I'm wondering if it's possible to write another function that can do the conversion on data, and then call to the necessary CoolFunction. It would be better if I didn't have to do this on the driver, but if necessary that's doable.

As a bonus: I'm actually doing this with streaming, so this whole mess is going to be in a call to foreachRDD, but I expect that if I can get this working in the normal case, I can make it work in the streaming case.

BBischof
  • 310
  • 2
  • 13
  • If you think about it, you are essentially splitting an RDD into smaller RDDs. As discussed in one of the answers here: http://stackoverflow.com/questions/32970709/how-to-split-a-rdd-into-two-or-more-rdds you cannot split an RDD. The best you can do is to filter it. In your case, you would have to split it into individual rows. There's probably no great way to do it -- your answer below works fine. A possible optimization would be to pull only an Array of "name" into the driver, then one-by-one filter the RDD by "name" and feed that to the function. Depending on your dataset, that may help. – David Griffin Mar 22 '16 at 11:35

1 Answers1

0

I was able to find a solution:

coolRdd.
collect.
foreach{ case (name, data) => 
 val data_list = data.toList
 sc.coolFunction(sc.parallelize(data_list), pid)
}

Where I was mistaken was failing to collect. Because only the driver knows about RDDs, collect is necessary here.

BBischof
  • 310
  • 2
  • 13
  • Leaving question open for a bit in case someone has a slicker solution to add. – BBischof Mar 22 '16 at 04:54
  • This is very bad practice. If your data fits on the driver, why do you want to distribute it in a RDD ? There is even more, collecting an RDD may overwhelmed the driver if your data volume is big which can lead to OOME. – eliasah Mar 22 '16 at 06:11
  • You're totally right, I was going to remark on this. One thing that saves me is that the individual rdds in these cases should be small. Can you see a better strategy here? – BBischof Mar 22 '16 at 06:49
  • if your rdd is small, rather using broadcast variables then, or even regular collections – eliasah Mar 22 '16 at 06:52
  • Sorry could say more? I don't quite see how that would look. – BBischof Mar 22 '16 at 06:53
  • i'm talking about using regular scala for this part. i'm sorry, i can't say more about it. the code snippet you gave doesn't actually say what is you are doing to help you fix. – eliasah Mar 22 '16 at 06:57