1

I have an RDD P mapped to the class:

case class MyRating(userId:Int, itemId:Int, rating:Double)

I am interested in finding TopN entries for each User i.e. GroupBy userId and within each formed group, filter out the TopN (say 10) entries based on highest rating.

I did the following:

val A : RDD[((Int), Iterable[MyRating])] = P.keyBy(r => (r.userId)).groupByKey
val B : RDD[((Int), List[MyRating])] = key.mapValues(iter => iter.toList.sortBy(_.rating, false))
val C = values.groupByKey.take(10)

Clearly applying .take(10) after groupByKey leaves me with only 10 keys(Users) and will not filter out the top10 ratings for each User.

How do we go about applying .take(N) after a groupBy so that it acts on some part of value instead of key itself?

zero323
  • 322,348
  • 103
  • 959
  • 935
srbhkmr
  • 2,074
  • 1
  • 14
  • 19

5 Answers5

3

A naive approach is to take n values:

B.mapValues(_.take(n))

but if you need only small subset of values it would be better to use for example aggregateByKey and drop obsolete records on the run instead of grouping everything. You probable want want something more efficient in practice (you can check Spark implementation of top / takeOrdered) but you can start with something like this:

import scala.math.Ordering
import scala.collection.mutable.PriorityQueue

implicit val ord = Ordering.by[MyRating, Double](_.rating)

val pairs = rdd.keyBy(_.userId)
pairs.aggregateByKey(new scala.collection.mutable.PriorityQueue[MyRating]())(
  (acc, x) => {
    acc.enqueue(x)
    acc.take(n)
  },
  (acc1, acc2) => (acc1 ++ acc2).take(n)
)

Note that above snippet requires Scala 2.11+ due to SI-7568.

zero323
  • 322,348
  • 103
  • 959
  • 935
3

If I understand correctly, what you need to do is: group the RDD by user Id, and then for every (id, list) tuple give back the id and the list sorted and trimmed to 10 elements

P
  .groupBy(_.userId)  
  .map{ case (key, it) => 
    (key, it.toList.sortBy(mr => -mr.rating).take(10)) 
  }
Daniel B.
  • 929
  • 4
  • 8
1

You were very close, but you need to take the top-N entries within the mapping of A to B. For example, if you wanted to take the top 2 MyRating items from a List, the below code would do the trick. B would be an RDD containing a List of the top 2 MyRatings for each userId. (Also, the sortBy function will work simply by making the rating negative).

case class MyRating(userId:Int, itemId:Int, rating:Double)

val plist:List[MyRating] = List(MyRating(1,0,1),MyRating(1,1,5),MyRating(1,2,7),MyRating(1,3,9),MyRating(1,4,10),MyRating(2,5,1),MyRating(2,6,5),MyRating(2,6,7))
val P: org.apache.spark.rdd.RDD[MyRating] = sc.parallelize(plist)

val A : RDD[((Int), Iterable[MyRating])] = P.keyBy(r => (r.userId)).groupByKey
val TOPCOUNT = 2
val B : RDD[((Int), List[MyRating])] = A.mapValues(iter => iter.toList.sortBy(- _.rating).take(TOPCOUNT))
pilot7
  • 58
  • 6
1

Here is an example using aggregateByKey as suggested by zero323:

val A : RDD[(Int, MyRating)] = P.keyBy(r => r.userId)
val B = A.aggregateByKey(List[MyRating]())(
  (l, r) => (l :+ r).sortBy(-_.rating).take(10),
  (l1, l2) => (l1 ++ l2).sortBy(-_.rating).take(10))

The benefit of using this method is that you don't possibly shuffle a large amount of data between your executors. If the ratings from a single user are distributed over multiple nodes, groupBy needs to send all the ratings for a user to the same executor, whereas with aggregateByKey first a top-N list is built on each executor, and then only those lists are shuffled and combined.

Whether this is beneficial to you depends on the distribution of data. If you don't have many more ratings than the final top you are after you're not gaining much (especially with my naive implementation that does a sort for each separate rating). However, if the number of ratings per executor is orders of magnitude larger you can win a lot.

sgvd
  • 3,819
  • 18
  • 31
0

I found the related post : Spark: Get top N by key

Copying @jbochi's recommendation :

Since version 1.4, there is a built-in way to do this using MLLib: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala

val scores = sc.parallelize(Array(
      ("a", 1),  
      ("a", 2), 
      ("a", 3), 
      ("b", 3), 
      ("b", 1), 
      ("a", 4),  
      ("b", 4), 
      ("b", 2)
))
import org.apache.spark.mllib.rdd.MLPairRDDFunctions.fromPairRDD
scores.topByKey(2) // Where the keys are a and b
abc123
  • 527
  • 5
  • 16