condition that " df_3 should be also partitioned by some_col and has 50 partitions " will only be true if df_1 and df_2 have the partitions with same values for "some_col" i.e. if df_1 has 2 partitions : [(1,2)], [(3,1),(3,7)], (such that some_col values are 1, 3) then df_2 needs to have partitions with some_col values 1,3. If that is the case then on joining df_1 and df_2, it will produce df_3 with same number of partition as in df_1 or df_2.
In all other cases it will try to create a default 200 partitions and shuffle the whole join operation.
for clarity you can try following example:
rdd1 = sc.parallelize([(1,2), (1,9), (2, 3), (3,4)])
df1 = rdd1.toDF(['a', 'b'])
df1 = df1.repartition(3, 'a')
df1.rdd.glom().collect() #outputs like:
>> [[Row(a=2,b=3)], [Row(a=3,b=4)], [Row(a=1,b=2), Row(a=1,b=9)]]
df1.rdd.getNumPartitions()
>>3
rdd2 = sc.parallelize([(1,21), (1,91), (2, 31), (3,41)])
df2 = rdd2.toDF(['a', 'b'])
df2 = df2.repartition(3, 'a')
df2.rdd.glom().collect() #outputs like:
>> [[Row(a=2,b=31)], [Row(a=3,b=41)], [Row(a=1,b=21), Row(a=1,b=91)]]
df2.rdd.getNumPartitions()
>>3
df3 = df1.join(df2, on='a')
df3.rdd.glom().collect() #outputs like:
>> [[Row(a=2,b=3,b=31)], [Row(a=3,b=4,b=41)], [Row(a=1,b=2,b=21), Row(a=1,b=9,b=91)]]
df21.rdd.getNumPartitions()
>>3