I think you should be able to deal with this problem using combineByKey
function that works on PairRDD
- here the key would be (group, user)
.
First step - pretty simple - is indeed to aggregate by key to retrieve something like:
(G1, U1), 30
(G1, U2), 25
(G1, U3), 20
(G2, U1), 30
(G2, U2), 40
(G2, U3), 45
You can create a class, say TopRanked
that will simply contain the topN results (e.g. an array of Tuple<String, String, Double>
of size N), that would expose a method insert(String, String, Double)
. This method is the key point, it should allow you to insert properly the object [String, String, Double]
at its position. It should also expose a method merge
that given two of those datastructures merges them into one that represent the top N of the merged data structure.
Then you define three functions:
createCombiner: Function<[(String, String), Double)], TopRanked>
mergeValue: Function2<TopRanked, [(String, String), Double)], TopRanked>
mergeCombiners: Function2<TopRanked, TopRanked, TopRanked>
What they should do is pretty straightforward:
createCombiner
must create a new TopRanked
object from the row [(String, String), Double)]
and then call insert(String, String, Double)
.
mergeValue
must insert [(String, String), Double)]
in its TopRanked
and return it.
mergeCombiner
simply calls the merge
of its two TopRanked
arguments and return the TopRanked
object merged.
To make it work, simply call pairRDD.combineByKey(createCombiner, mergeValue, mergeCombiners)
(note that you can add a Partitioner to optimize a bit!).