Slightly modified your input data.
val scores = sc.parallelize(Array(
("a", 1),
("a", 2),
("a", 3),
("b", 3),
("b", 1),
("a", 4),
("b", 4),
("b", 2),
("a", 6),
("b", 8)
))
I explain how to do it step by step:
1.Group by key to create array
scores.groupByKey().foreach(println)
Result:
(b,CompactBuffer(3, 1, 4, 2, 8))
(a,CompactBuffer(1, 2, 3, 4, 6))
As you see, each value itself is a array of numbers. CompactBuffer is just optimised array.
2.For each key, reverse sort list of numbers that value contains
scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse)} ).foreach(println)
Result:
(b,List(8, 4, 3, 2, 1))
(a,List(6, 4, 3, 2, 1))
3.Keep only first 2 elements from the 2nd step, they will be top 2 scores in the list
scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse).take(2)} ).foreach(println)
Result:
(a,List(6, 4))
(b,List(8, 4))
4.Flat map to create new Paired RDD for each key and top score
scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse).take(2)} ).flatMap({case (k, numbers) => numbers.map(k -> _)}).foreach(println)
Result:
(b,8)
(b,4)
(a,6)
(a,4)
5.Optional step - sort by key if you want
scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse).take(2)} ).flatMap({case (k, numbers) => numbers.map(k -> _)}).sortByKey(false).foreach(println)
Result:
(a,6)
(a,4)
(b,8)
(b,4)
Hope, this explanation helped to understand the logic.