2

Say, I have two Spark DataFrames with column some_col

df_1 = df_1.repartition(50, 'some_col')
df_2 = df_2.repartition(50, 'some_col')

df_3 = df_1.join(df_2, on='some_col')

I thought that df_3 should be also partitioned by some_col and has 50 partitions but my experiments show that at least the last condition is not true. Why does it happen?

What happens in terms of time-consuming operations (re-partitioning or re-location) can happen after

df_3 = df_3.repartition(50, 'some_col')
AlexanderLedovsky
  • 727
  • 1
  • 6
  • 18
  • Why do you think `df_3` should have 50 partitions? What says `df_3.rdd.getNumPartitions()` right after join? – Mariusz Oct 16 '17 at 17:26
  • 1
    I found that `df_3.rdd.getNumPartitions()` is equal `spark.default.parallelism`. It looks strange.. Why does it happen? It looks much easier to make join inside same partitions! – AlexanderLedovsky Oct 16 '17 at 17:55
  • It's always equal to `spark.sql.shuffle.partitions`, you can read more here: https://stackoverflow.com/questions/41359344/why-is-the-number-of-partitions-after-groupby-200-why-is-this-200-not-some-othe – Mariusz Oct 16 '17 at 20:00
  • But why does shuffle happen? According [this answer](https://stackoverflow.com/a/43843509/1849828) it's not necessary to shuffle due to right partitioning, isn't it? – AlexanderLedovsky Oct 17 '17 at 08:34
  • The shuffle happens, because spark requires 200 partitions (so it need to be repartitioned). You can try setting parameter to 50 and see if shuffling still occures (I don't know) – Mariusz Oct 17 '17 at 10:51

1 Answers1

3

condition that " df_3 should be also partitioned by some_col and has 50 partitions " will only be true if df_1 and df_2 have the partitions with same values for "some_col" i.e. if df_1 has 2 partitions : [(1,2)], [(3,1),(3,7)], (such that some_col values are 1, 3) then df_2 needs to have partitions with some_col values 1,3. If that is the case then on joining df_1 and df_2, it will produce df_3 with same number of partition as in df_1 or df_2.

In all other cases it will try to create a default 200 partitions and shuffle the whole join operation.

for clarity you can try following example:

rdd1 = sc.parallelize([(1,2), (1,9), (2, 3), (3,4)])
df1 = rdd1.toDF(['a', 'b'])
df1 = df1.repartition(3, 'a')
df1.rdd.glom().collect() #outputs like:
>> [[Row(a=2,b=3)], [Row(a=3,b=4)], [Row(a=1,b=2), Row(a=1,b=9)]]

df1.rdd.getNumPartitions()
>>3

rdd2 = sc.parallelize([(1,21), (1,91), (2, 31), (3,41)])
df2 = rdd2.toDF(['a', 'b'])
df2 = df2.repartition(3, 'a')
df2.rdd.glom().collect() #outputs like:
>> [[Row(a=2,b=31)], [Row(a=3,b=41)], [Row(a=1,b=21), Row(a=1,b=91)]]

df2.rdd.getNumPartitions()
>>3


df3 = df1.join(df2, on='a')
df3.rdd.glom().collect() #outputs like:
>> [[Row(a=2,b=3,b=31)], [Row(a=3,b=4,b=41)], [Row(a=1,b=2,b=21), Row(a=1,b=9,b=91)]]
df21.rdd.getNumPartitions()
>>3
joshi.n
  • 489
  • 3
  • 7
  • Thx! Actually it's much more clear now. Am I right that if some_col in df1 and df2 has not the same values (e.g. [1, 3] and [1, 2, 3]) it will always cause a shuffle? – AlexanderLedovsky Oct 22 '17 at 09:41