15

It is Hadoop MapReduce shuffle's default behavior to sort the shuffle key within partition, but not cross partitions(It is the total ordering that makes keys sorted cross the parttions)

I would ask how to achieve the same thing using Spark RDD(sort within Partition,but not sort cross the partitions)

  1. RDD's sortByKey method is doing total ordering
  2. RDD's repartitionAndSortWithinPartitions is doing sort within partition but not cross partitions, but unfortunately it adds an extra step to do repartition.

Is there a direct way to sort within partition but not cross partitions?

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
Tom
  • 5,848
  • 12
  • 44
  • 104

2 Answers2

17

You can use Dataset and sortWithinPartitions method:

import spark.implicits._

sc.parallelize(Seq("e", "d", "f", "b", "c", "a"), 2)
  .toDF("text")
  .sortWithinPartitions($"text")
  .show

+----+
|text|
+----+
|   d|
|   e|
|   f|
|   a|
|   b|
|   c|
+----+

In general shuffle is an important factor in sorting partitions because it reuse shuffle structures to sort without loading all data into memory at once.

Shaido
  • 27,497
  • 23
  • 70
  • 73
user7849215
  • 186
  • 1
  • 2
  • 8
    Thanks @user7849215 for the helpful answer. Is there way to do sortWithinPartitions with raw RDD? – Tom Apr 11 '17 at 08:16
0

I've never had this need before, but my first guess would be to use any of the *Partition* methods (e.g. foreachPartition or mapPartitions) to do the sorting within every partition.

Since they give you a Scala Iterator, you could use it.toSeq and then apply any of the sorting methods of Seq, e.g. sortBy or sortWith or sorted.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
  • 1
    If the data in the partition is very large, then scala sort may cause OOM? – Tom Nov 13 '18 at 09:53
  • Correct! Are you looking for dump-to-disk-on-demand version of sort in Spark? – Jacek Laskowski Nov 13 '18 at 10:59
  • Thanks @jacek-laskowski, i am really wondering why RDD#repartitionAndSortWithinPartitions is implemented as repartition first and then sort. As far as I know, repartition or shuffle doesn't involve sort at all. Sort based shuffle only revolves sort by partitionId. – Tom Nov 13 '18 at 11:15
  • My guess is that it is to make sure that all elements for given expression(s) are in the same partitions? Exploring SortExec physical operator in Spark SQL as we speak so I might have more details in the coming days. – Jacek Laskowski Nov 13 '18 at 13:03
  • Thanks @jacek-laskowski. I looked at SortExec,when doing within partition sort(not global sort), it doesn't have requirement on its child's data distribution, it just asks UnspecifiedDistribution – Tom Nov 14 '18 at 00:44
  • Correct. Does this make sense? How does that change our conversation? I'm asking since I'm still in an exploration mode. – Jacek Laskowski Nov 14 '18 at 06:12