1

As per the documentation:

spark.default.parallelism:Default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user

spark.default.parallelism: For distributed shuffle operations like reduceByKey and join, the largest number of partitions in a parent RDD

I am not able to produce the documented behaviour

Dataframe:

I create 2 DFs with partitions 3 and 50 respectively and join them. The output should have 50 partitions but, always has the number of partitions equal to the number of partitions of the larger DF

df1 = spark.range(1,10000).repartition(50)
df2 = spark.range(1,100000).repartition(3)
df3 = df1.join(df2,df1.id==df2.id,'inner')
df3.rdd.getNumPartitions() #3

RDD:

I create 2 DFs with partitions 3 and 50 respectively and join the underlying RDDs of them. The output RDD should have 50 partitions but, has 53 partitions

df1 = spark.range(1,10000).repartition(50)
df2 = spark.range(1,100000).repartition(3)
df3 = df1.rdd.join(df2.rdd)
df3.getNumPartitions() #53

How to understand the conf spark.default.parallelism?

figs_and_nuts
  • 4,870
  • 2
  • 31
  • 56

1 Answers1

2

For now i can answer df part, i am experimenting with rdd so maybe i will add edit later

For df there is parameter spark.sql.shuffle.partitions which is used during joins etc, its set to 200 by default. In your case its not used and there may be two reasons:

Your datasets are smaller than 10mb so one dataset is broadcasted and there is no shuffle during join so you end up with number of partitions from bigger dataset

You may have AQE enabled which is changing the number of partitions.

I did a quick check with broadcast and AQE disabled and results are as expected

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.shuffle.partitions",100)
​
df1 = spark.range(1,10000).repartition(50)
df2 = spark.range(1,100000).repartition(3)
df3 = df1.join(df2,df1.id==df2.id,'inner')
df3.rdd.getNumPartitions() #100
Out[35]: 100

For second case with rdds i think that rdds are co-partitioned and due to that Spark is not triggering full shuffle: "Understanding Co-partitions and Co-Grouping In Spark"

What i can see in query plan is that instead of join Spark is using union and thats why in final rdd you can see 53 partitions

enter image description here

M_S
  • 2,863
  • 2
  • 2
  • 17
  • 1
    I got both parts of the answer. Thank you! However, I gotta ask, why would spark use union instead of a join? Is it not a really long and inefficient road to an inner join? – figs_and_nuts Dec 11 '22 at 19:25
  • I think that its actually not that bad, in case of union of rdds Spark is not moving data around, its just a bookkeeping change so it may be quite efficient, for sure better than sort-merge join which requires shuffle – M_S Dec 11 '22 at 19:44
  • But since these are co-partitioned, beats me why they don't just sort and merge within the same partitions. No shuffle is needed and easier to achieve inner, outer or any join mechanism I think. – figs_and_nuts Dec 11 '22 at 20:21