Solution is to iterate through the result list of values populated from groupByKey() and then extracting topN records and appending those values in a new list.
Following is the working example, you can execute it on Cloudera VM as I have used Cloudera sample data set. Before executing it, make sure that you have product RDD generated from products table which exists in mySql - retail_db database.
getTopN function ->
def getTopN(rec: (String, Iterable[String]), topN: Int): Iterable[String] = {
var prodPrices: List[Float] = List()
var topNPrices: List[Float] = List()
var sortedRecs: List[String] = List()
for(i <- rec._2) {
prodPrices = prodPrices:+ i.split(",")(4).toFloat
}
topNPrices = prodPrices.distinct.sortBy(k => -k).take(topN)
sortedRecs = rec._2.toList.sortBy(k => -k.split(",")(4).toFloat)
var x: List[String] = List()
for(i <- sortedRecs) {
if(topNPrices.contains(i.split(",")(4).toFloat))
x = x:+ i
}
return x
}
Main code ->
##code to generate products RDD
val productsMap = products.
map(rec => (rec.split(",")(1), rec))
productsMap.
groupByKey().
flatMap(x => getTopN(x, 3)).
collect().
foreach(println)