5

How does spark determine the number of partitions after using an orderBy? I always thought that the resulting dataframe has spark.sql.shuffle.partitions, but this does not seem to be true :

val df = (1 to 10000).map(i => ("a",i)).toDF("n","i").repartition(10).cache

df.orderBy($"i").rdd.getNumPartitions // = 200 (=spark.sql.shuffle.partitions)
df.orderBy($"n").rdd.getNumPartitions // = 2 

In both cases, spark does +- Exchange rangepartitioning(i/n ASC NULLS FIRST, 200), so how can the resulting number of partitions in the second case be 2?

Raphael Roth
  • 26,751
  • 15
  • 88
  • 145
  • I compared both with explain extended which gave me all plans...all plans are in the same path, but only difference is i string and n is number. While doing RangePartitioning internally within spark.... based on data type its differing it seems. – Ram Ghadiyaram Dec 14 '18 at 22:20
  • one thing I noticed is if you use `df.orderBy($"n",$"i")` then again partition length is 200. so this is purely based on datatype something happening inside spark api. – Ram Ghadiyaram Dec 14 '18 at 23:02
  • Got 5 when unioining 4 df's, with a,b,c,d respectively as per your example. In the code there must be the answer. – thebluephantom Dec 16 '18 at 13:30

2 Answers2

11

spark.sql.shuffle.partitions is used as an upper bound. The final number of partitions is 1 <= partitions <= spark.sql.shuffle.partition.


As you've mentioned, the sorting in Spark goes through RangePartitioner. What it tries to achieve is to partition your dataset into a specified number (spark.sql.shuffle.partition) of roughly equal ranges.

There's a guarantee that equal values will be in the same partition after the partitioning. It's worth checking RangePartitioning (not part of the public API) class documentation:

...

All row where the expressions in ordering evaluate to the same values will be in the same partition

And if the number of distinct ordering values is less than the desired number of partitions, i.e. the number of possible ranges is less than spark.sql.shuffle.partition, you'll end up with a smaller number of partitions. Also, here's a quote from RangePartitioner Scaladoc:

The actual number of partitions created by the RangePartitioner might not be the same as the partitions parameter, in the case where the number of sampled records is less than the value of partitions.

Going back to your example, n is a constant ("a") and could not be partitioned. On the other hand, i can have 10,000 possible values and is partitioned into 200 (=spark.sql.shuffle.partition) ranges or partitions.

Note that this is only true for DataFrame/Dataset API. When using RDD's sortByKey one can either specify the number of partitions explicitly or Spark will use the current number of partitions.

See also:

Sergey Khudyakov
  • 1,122
  • 1
  • 8
  • 15
  • He got 2 partitions for constant a. 1st block in yellow states otherwise. – thebluephantom Dec 30 '18 at 07:23
  • @thebluephantom 2 partitions but all _rows_ will end up in the first partition. You can check with `df.orderBy("n").rdd.mapPartitionsWithIndex((idx, iter) => if (idx == 0) iter else Iterator()).count()` – Sergey Khudyakov Dec 30 '18 at 08:35
  • So, an empty 2nd partition. That is reassuring, the question he asks is why 2 partitions, 1 empty then? I got 5 with 4 distinct values. Looks understable then – thebluephantom Dec 30 '18 at 08:39
1

I ran various tests so as to look at this more empirically, in addition to looking at Range Partitioning for Sorting - which is the crux of the matter here. See How does range partitioner work in Spark?.

Having experimented with both 1 distinct value for "n" as in the example in the question, and more than 1 such distinct value for the "n", then using various dataframe sizes with df.orderBy($"n"):

  • it is clear that the calculation for determining the number of partitions that will contain ranges of data for sorting subsequently via mapPartitions,
  • which is based on sampling from the existing partitions prior to computing some heuristically optimal number of partitions for these computed ranges,
  • will in most cases compute and thus generate N+1 partitions, whereby partition N+1 is empty.

The fact that the extra partition allocated is nearly always empty makes me think there is a calculation error in the coding in some way, in other words a small bug imho.

I base this on the following simple test, which does return what RR I suspect would consider to be the proper number of partitions:

val df_a1 = (1 to 1).map(i => ("a",i)).toDF("n","i").cache
val df_a2 = (1 to 1).map(i => ("b",i)).toDF("n","i").cache
val df_a3 = (1 to 1).map(i => ("c",i)).toDF("n","i").cache
val df_b = df_a1.union(df_a2)
val df_c = df_b.union(df_a3)

df_c.orderBy($"n")
 .rdd
 .mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
 .toDF("partition_number","number_of_records")
 .show(100,false)

returns:

+----------------+-----------------+
|partition_number|number_of_records|
+----------------+-----------------+
|0               |1                |
|1               |1                |
|2               |1                |
+----------------+-----------------+

This boundary example calculation is rather simple. As soon as I use 1 to 2 or 1 .. N for any of the "n", the extra empty partitions results:

+----------------+-----------------+
|partition_number|number_of_records|
+----------------+-----------------+
|0               |2                |
|1               |1                |
|2               |1                |
|3               |0                |
+----------------+-----------------+

The sorting requires all data for a given "n" or set of "n" to be in the same partition.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83