34

Reading Spark method sortByKey :

sortByKey([ascending], [numTasks])   When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

Is it possible to return just "N" amount of results. So instead of returning all results, just return the top 10. I could convert the sorted collection to an Array and use take method but since this is an O(N) operation is there a more efficient method ?

Community
  • 1
  • 1
blue-sky
  • 51,962
  • 152
  • 427
  • 752

3 Answers3

51

If you only need the top 10, use rdd.top(10). It avoids sorting, so it is faster.

rdd.top makes one parallel pass through the data, collecting the top N in each partition in a heap, then merges the heaps. It is an O(rdd.count) operation. Sorting would be O(rdd.count log rdd.count), and incur a lot of data transfer — it does a shuffle, so all of the data would be transmitted over the network.

Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
  • 2
    I did not know about this method. It is a better solution than sort() . So this is a better answer to the question than mine (though it does maybe provide some useful background ). I am upvoting. – WestCoastProjects Mar 20 '15 at 18:05
  • Hi, I have a `pairRdd`, is there a way to use `top` method in this `pairRdd`? For example, top(10) will return 10 elements for each key in this 'pairRdd'. I really need to know this. – chrisTina Apr 09 '15 at 13:52
  • 2
    No, it does not work like that. I suggest a separate question for finding the top 10 by key, as it's a bigger topic. – Daniel Darabos Apr 09 '15 at 16:17
  • https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD – Daniel Darabos Nov 29 '15 at 12:31
19

Most likely you have already perused the source code:

  class OrderedRDDFunctions {
   // <snip>
  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
    val part = new RangePartitioner(numPartitions, self, ascending)
    val shuffled = new ShuffledRDD[K, V, P](self, part)
    shuffled.mapPartitions(iter => {
      val buf = iter.toArray
      if (ascending) {
        buf.sortWith((x, y) => x._1 < y._1).iterator
      } else {
        buf.sortWith((x, y) => x._1 > y._1).iterator
      }
    }, preservesPartitioning = true)
  }

And, as you say, the entire data must go through the shuffle stage - as seen in the snippet.

However, your concern about subsequently invoking take(K) may not be so accurate. This operation does NOT cycle through all N items:

  /**
   * Take the first num elements of the RDD. It works by first scanning one partition, and use the
   * results from that partition to estimate the number of additional partitions needed to satisfy
   * the limit.
   */
  def take(num: Int): Array[T] = {

So then, it would seem:

O(myRdd.take(K)) << O(myRdd.sortByKey()) ~= O(myRdd.sortByKey.take(k)) (at least for small K) << O(myRdd.sortByKey().collect()

WestCoastProjects
  • 58,982
  • 91
  • 316
  • 560
  • sortByKey() as other RDD transformations are subject to lazy evaluation.. Would sortByKey.take(k) optimized into takeOrdered(k, func) or into take(k).sortByKey? Thought that was the whole point of lazy evaluation so physical plans can be optimized. It could be implemented better in Data Frames? – Tagar Jun 21 '15 at 06:15
  • @Ruslan I do not believe that such rearrangements / optimizations presently occur within spark core. I am only aware of similar optimizations happening within the SQL/catalyst optimizer. – WestCoastProjects Jun 21 '15 at 15:35
8

Another option, at least from PySpark 1.2.0, is the use of takeOrdered.

In ascending order:

rdd.takeOrdered(10)

In descending order:

rdd.takeOrdered(10, lambda x: -x)

Top k values for k,v pairs:

rdd.takeOrdered(10, lambda (k, v): -v)
jruizaranguren
  • 12,679
  • 7
  • 55
  • 73