3

I have a RDD in which each entry belongs to a class. I want to separate the single RDD into several RDD, such that all entries of a class goes into one RDD. Suppose I have 100 such classes in the input RDD, I want each clas into its own RDD. I can do this with a filter for each class (as shown below), but it would launch several jobs. Is there a better way to do it in a single job?

def method(val input:RDD[LabeledPoint], val classes:List[Double]):List[RDD] = 
      classes.map{lbl=>input.filter(_.label==lbl)}

Its similar to another question, but I have more than 2 classes (around 10)

Arun
  • 645
  • 7
  • 16
  • "I want each clas into its own RDD." Why? What will you do with them afterwards? – The Archetypal Paul Apr 16 '15 at 13:23
  • Well... the Spark RDD model was not created with such operations in mind. But still if you want something like this... you can always use the most obvious approach ( as you did ). Now... about the point of being able to this in a "single job" ( Most operations on a single RDD really involve multiple jobs, so I am not sure what you mean by "single job" but let assume you mean O(n) operations without depending on number of classes )... I don't think it "should" have been possible according to current philosophy of RDD's. – sarveshseri Apr 16 '15 at 13:48
  • @Paul Another method (StatisticsSummary) expects a RDD as input. I want to get the summary statistics for each class – Arun Apr 16 '15 at 15:37
  • I don't think it's really a dupe, since you are wanting to divide into more than two RDDs. I upvoted b/c I really like your solution to that! – Kevin Pauli Jul 23 '15 at 21:29

2 Answers2

2

I was facing the same issue and unfortunately there is no other way according to different resources I found.

The thing is that you need to go from RDD to create the actual list in your result and if you look here, the answer also says it's not possible.

What you do should be fine and if you want to optimize things, then just go for caching the data if you can.

Ivan Nikolov
  • 793
  • 4
  • 15
  • is there any way to change spark code to support that. A RDD is a set of partitions. And a partition can be split into List[List[]]. How to divide the partitions to create List[RDD] – Arun Apr 16 '15 at 12:34
  • Operations on RRDs return other RDDs. This is how the API is defined. I wouldn't go against this. You might be able to change something, but I think it would break everything else and it will take you quite a lot of time and even if it works, I'm not sure if it will be accepted as a pull request. Caching the data set is the best you can do and I'd say what you should do. Is there a reason you want to avoid doing this? – Ivan Nikolov Apr 16 '15 at 13:01
  • accepting as caching seems to help any run time costs. Thanks – Arun Apr 17 '15 at 02:09
0

AFAIK that is not possible, but you might have a conceptual problem.

Given your comments, what you probably want is to use aggregateByKey(). No need to create a hundred RDDs, just have one keyed by class and build a custom aggregation method that aggregates your stats. Spark will distribute your instances around by class such that you can operate on them independently. If the logic changes depending on class, you can always use if/elses, switches, polymorphism, etc, etc...

Daniel Langdon
  • 5,899
  • 4
  • 28
  • 48
  • I want to create a RDD for each class and not aggregateByKey as this would aggregate the values of a class to a single partition. Suppose I only have 5 classes, there would be a lot of data movement. I also require this as another method (Statistics.colStats) require a RDD. So aggregating and creating a RDD again would be costly. – Arun Apr 17 '15 at 02:08
  • 1
    Then I guess you are screwed, you'll have to create each new RDD by filtering the old one :-S But do note that agregateByKey first aggregates separately on multiple partitions and only then aggregates the intermediate results (similar to hadoop combiners). That's why it is recommended above groupByKey(). Depending on your version, you might also look at combineByKey() and reduceByKey(). Maybe you can re-write your statistics this way... or not...check it out! – Daniel Langdon Apr 17 '15 at 13:27