1

I have a rdd with format of each row (key, (int, double))

I would like to transform the rdd into (key, ((int, double), (int, double) ...) )

Where the the values in the new rdd is the top N values pairs sorted by the double

So far I came up with the solution below but it's really slow and runs forever, it works fine with smaller rdd but now the rdd is too big

val top_rated = test_rated.partitionBy(new HashPartitioner(4)).sortBy(_._2._2).groupByKey()
            .mapValues(x => x.takeRight(n))

I wonder if there are better and faster ways to do this?

yliu
  • 13
  • 4

2 Answers2

0

The most efficient way is probably aggregateByKey

type K = String
type V = (Int, Double)
val rdd: RDD[(K, V)] = ???

//TODO: implement a function that adds a value to a sorted array and keeps top N elements. Returns the same array
def addToSortedArray(arr: Array[V], newValue: V): Array[V] = ??? 
//TODO: implement a function that merges 2 sorted arrays and keeps top N elements. Returns the first array
def mergeSortedArrays(arr1: Array[V], arr2: Array[V]): Array[V] = ??? //TODO

val result: RDD[(K, Array[(Int, Double)])] = rdd.aggregateByKey(zeroValue = new Array[V](0))(seqOp = addToSortedArray, combOp = mergeSortedArrays)
simpadjo
  • 3,947
  • 1
  • 13
  • 38
0

Since you're interested only in the top-N values in your RDD, I would suggest that you avoid sorting across the entire RDD. In addition, use the more performing reduceByKey rather than groupByKey if at all possible. Below is an example using a topN method, borrowed from this blog:

def topN(n: Int, list: List[(Int, Double)]): List[(Int, Double)] = {
  def bigHead(l: List[(Int, Double)]): List[(Int, Double)] = list match {
    case Nil => list
    case _ => l.tail.foldLeft( List(l.head) )( (acc, x) =>
        if (x._2 <= acc.head._2) x :: acc else acc :+ x
      )
  }
  def update(l: List[(Int, Double)], e: (Int, Double)): List[(Int, Double)] = {
    if (e._2 > l.head._2) bigHead((e :: l.tail)) else l
  }
  list.drop(n).foldLeft( bigHead(list.take(n)) )( update ).sortWith(_._2 > _._2)
}

val rdd = sc.parallelize(Seq(
  ("a", (1, 10.0)), ("a", (4, 40.0)), ("a", (3, 30.0)), ("a", (5, 50.0)), ("a", (2, 20.0)),
  ("b", (3, 30.0)), ("b", (1, 10.0)), ("b", (4, 40.0)), ("b", (2, 20.0))
))

val n = 2

rdd.
  map{ case (k, v) => (k, List(v)) }.
  reduceByKey{ (acc, x) => topN(n, acc ++ x) }.
  collect
// res1:  Array[(String, List[(Int, Double)])] =
//   Array((a,List((5,50.0), (4,40.0))), (b,List((4,40.0), (3,30.0)))))
Leo C
  • 22,006
  • 3
  • 26
  • 39