3

My RDD contains three values [Group, User, Bytes]. My requirement is to aggregate on Bytes consumed by user and get top N users by total bytes per group.

For ex: with input data, as:

G1 U1 10
G1 U1 20
G1 U2 25
G1 U3 20
G2 U1 30
G2 U2 15
G2 U2 25
G2 U3 45

Query with top2 should return:

G1 U1 30
G1 U2 25
G2 U3 45
G2 U2 40

So far my code is as follows:

rdd: RDD[(String, String), Double)
rdd.reduceByKey((x,y) => (x+y))
    .map {
       x => ((x._1._1), (x._1._2, x._2))
    }.sortBy(x => x._2._2, false)

I am yet to figure out how to group by GROUP value and then only take topN results. Can anyone help further or if there is a better way of solving my requirement?

Firdousi Farozan
  • 762
  • 1
  • 6
  • 14
  • how do you create your rdd? – eliasah May 28 '15 at 15:36
  • I use a flatMap on a RDD loaded from text file (this is for testing purpose; In reality, I will change this to read from cassandra). And the flatMap takes a method such as : def traverseDataPoint(dataPoint: Array[Byte]): ListBuffer[((String, String), Double)] – Firdousi Farozan May 28 '15 at 16:56

2 Answers2

1

From your questions it seems you are trying to get the rank(SQL) for each group.

So here is my solution for your question.It might not be the most efficient but it works

val rddsum = rdd.reduceByKey((x,y) => (x+y)).map(x => (x._1._1,x._1._2,x._2))

gives result as before,

(G1, U1, 30)
(G1, U2, 25)
(G1, U3, 20)
(G2, U1, 30)
(G2, U2, 40)
(G2, U3, 45)

now, groupby 1st col and mapValues with rank

val grpd = rddsum.groupBy{x => x._1}

val sortAndRankedItems = grpd.mapValues{ it => it.toList.sortBy{x => x._3}.zip(Stream from 1) } 

Now sortAndRankedItems will be of type Array[(String, List[((String, String, String), Int)])] Hence take only second element which is of interest by flatmapping, filter topN elements in this case it is 2, then consider 1st element only i,e tuple to arrive at the answer.

val result = sortAndRankedItems.flatMap{case(m,n) => n}.filter{x => x._2 <= 2}.map{case(x,y) => x}

Hope it helps!!

Akash
  • 355
  • 4
  • 11
0

I think you should be able to deal with this problem using combineByKey function that works on PairRDD - here the key would be (group, user).

First step - pretty simple - is indeed to aggregate by key to retrieve something like:

(G1, U1), 30
(G1, U2), 25
(G1, U3), 20
(G2, U1), 30
(G2, U2), 40
(G2, U3), 45

You can create a class, say TopRanked that will simply contain the topN results (e.g. an array of Tuple<String, String, Double> of size N), that would expose a method insert(String, String, Double). This method is the key point, it should allow you to insert properly the object [String, String, Double] at its position. It should also expose a method merge that given two of those datastructures merges them into one that represent the top N of the merged data structure.

Then you define three functions:

createCombiner: Function<[(String, String), Double)], TopRanked>
mergeValue: Function2<TopRanked, [(String, String), Double)], TopRanked>
mergeCombiners: Function2<TopRanked, TopRanked, TopRanked>

What they should do is pretty straightforward:

createCombiner must create a new TopRanked object from the row [(String, String), Double)] and then call insert(String, String, Double).

mergeValue must insert [(String, String), Double)] in its TopRanked and return it.

mergeCombiner simply calls the merge of its two TopRanked arguments and return the TopRanked object merged.

To make it work, simply call pairRDD.combineByKey(createCombiner, mergeValue, mergeCombiners) (note that you can add a Partitioner to optimize a bit!).

Vince.Bdn
  • 1,145
  • 1
  • 13
  • 28
  • Thanks. Will try it out. But when the aggregation happens at a per partition level, how will the topN work correctly - if we only store topN of that partition? Wouldnt we have to store all tuples to get the right topN? – Firdousi Farozan May 28 '15 at 18:21
  • If you know some internals of topN API of RDD, please share. – Firdousi Farozan May 28 '15 at 18:21
  • 1
    Hey! The `TopRanked` class would look somehow like a wrapper around a simple array of `[String, String, Double]` - fixed size N. It would have a constructor that takes the size N as input and instanciate this array, an `insert` method that, if the double as entry in one of top N, then you insert `[String, String, Double]`at its position, else you do nothing; and a `merge` that merges two of these structure (like fusion sort). The solution above by @Anda is a good one but implies shuffles you might want to avoid with big datas. – Vince.Bdn May 29 '15 at 07:46
  • By the way, if you want to understand why the solution above might be too expensive with shuffles, here's something you might want to read: [link](http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html) – Vince.Bdn May 29 '15 at 08:02
  • Thanks for the link. I understand that groupByKey is expensive. – Firdousi Farozan Jun 01 '15 at 12:20
  • I am still trying to understand how this fixed size N array will help. Spark workers will operate on each partition and lets say, there are two partitions. And there will be N-size array TopRanked in each partition. The topN in each partition will be accumulated in this object. And anything below the topN will be discarded. By this logic, we may lose the actual topN which could be below the topN of each partition. Let me know if my understanding is wrong. – Firdousi Farozan Jun 01 '15 at 12:23