11

In many posts there is the statement - as shown below in some form or another - due to some question on shuffling, partitioning, due to JOIN, AGGR, whatever, etc.:

... In general whenever you do a spark sql aggregation or join which shuffles data this is the number of resulting partitions = 200. This is set by spark.sql.shuffle.partitions. ...

So, my question is:

  • Do we mean that if we have set partitioning at 765 for a DF, for example,
  • That the processing occurs against 765 partitions, but that the output is coalesced / re-partitioned standardly to 200 - referring here to word resulting?
  • Or does it do the processing using 200 partitions after coalescing / re-partitioning to 200 partitions before JOINing, AGGR?

I ask as I never see a clear viewpoint.

I did the following test:

// genned a DS of some 20M short rows
df0.count
val ds1 = df0.repartition(765)
ds1.count
val ds2 = df0.repartition(765)
ds2.count

sqlContext.setConf("spark.sql.shuffle.partitions", "765")
// The above not included on 1st run, the above included on 2nd run.

ds1.rdd.partitions.size
ds2.rdd.partitions.size

val joined = ds1.join(ds2, ds1("time_asc") === ds2("time_asc"), "outer") 
joined.rdd.partitions.size
joined.count
joined.rdd.partitions.size

On the 1st test - not defining sqlContext.setConf("spark.sql.shuffle.partitions", "765"), the processing and num partitions resulted was 200. Even though SO post 45704156 states it may not apply to DFs - this is a DS.

On the 2nd test - defining sqlContext.setConf("spark.sql.shuffle.partitions", "765"), the processing and num partitions resulted was 765. Even though SO post 45704156 states it may not apply to DFs - this is a DS.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83

2 Answers2

11

It is a combination of both your guesses.

Assume you have a set of input data with M partitions and you set shuffle partitions to N.

When executing a join, spark reads your input data in all M partitions and re-shuffle the data based on the key to N partitions. Imagine a trivial hashpartitioner, the hash function applied on the key pretty much looks like A = hashcode(key) % N, and then this data is re-allocated to the node in charge of handling the Ath partition. Each node can be in charge of handling multiple partitions.

After shuffling, the nodes will work to aggregate the data in partitions they are in charge of. As no additional shuffling needs to be done here, the nodes can produce the output directly.

So in summary, your output will be coalesced to N partitions, however it is coalesced because it is processed in N partitions, not because spark applies one additional shuffle stage to specifically repartition your output data to N.

HaoYuan
  • 324
  • 2
  • 6
9

Spark.sql.shuffle.partitions is the parameter which decides the number of partitions while doing shuffles like joins or aggregation i.e where data movement is there across the nodes. The other part spark.default.parallelism will be calculated on basis of your data size and max block size, in HDFS it’s 128mb. So if your job does not do any shuffle it will consider the default parallelism value or if you are using rdd you can set it by your own. While shuffling happens it will take 200.

Val df = sc.parallelize(List(1,2,3,4,5),4).toDF() df.count() // this will use 4 partitions

Val df1 = df df1.except(df).count // will generate 200 partitions having 2 stages

Chandan Ray
  • 2,031
  • 1
  • 10
  • 15
  • But I gave an example of 765 partitions. I mention this as there is ambiguity in many postings and the question comes up again and again. I am going to run a test tonight an will get back. – thebluephantom Aug 21 '18 at 15:35
  • Let me try with 765 – Chandan Ray Aug 21 '18 at 15:35
  • I tried with the same example with 765 partitions and first instance with minus query it generated 766 tasks. 765 tasks for each partition and one extra for combining final result. Then with the except it generated 1166 tasks. 765 tasks to map the partitions and 200 each for another 2 stages for shuffle and 1 for combining – Chandan Ray Aug 21 '18 at 15:45
  • That is a better insight. I will do an extra test and get back after dinner. – thebluephantom Aug 21 '18 at 16:13
  • Sure plz let me know your observation – Chandan Ray Aug 21 '18 at 16:16
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/178465/discussion-between-thebluephantom-and-chandan). – thebluephantom Aug 21 '18 at 16:39
  • The answer is correct but can be embellished with my analysis above. – thebluephantom Aug 21 '18 at 19:42