I have an RDD P mapped to the class:
case class MyRating(userId:Int, itemId:Int, rating:Double)
I am interested in finding TopN entries for each User i.e. GroupBy userId and within each formed group, filter out the TopN (say 10) entries based on highest rating.
I did the following:
val A : RDD[((Int), Iterable[MyRating])] = P.keyBy(r => (r.userId)).groupByKey
val B : RDD[((Int), List[MyRating])] = key.mapValues(iter => iter.toList.sortBy(_.rating, false))
val C = values.groupByKey.take(10)
Clearly applying .take(10) after groupByKey leaves me with only 10 keys(Users) and will not filter out the top10 ratings for each User.
How do we go about applying .take(N) after a groupBy so that it acts on some part of value instead of key itself?