7

I have two large dataframes df1 and df2 partitioned by column a, and I want to efficiently compute a left join on both a and another column b:

df1.join(df2, on=['a', 'b'], how='left_outer')

When written as above, Spark reshuffles both dataframes by key (a, b), which is extremely inefficient. Instead, I would like it to take advantage of the existing partitioning by a to avoid the shuffle (performing the join within each partition), which should be a lot faster (especially since I have further processing steps that benefit from this partitioning).

Is there any way to prevent this shuffle and obtain a resulting dataframe partitioned by a ?

Note that if it was an inner join I could do the following, but (1) I'm not sure if it'd be efficient, and anyway (2) it doesn't work with a left join (I'm only providing it in case it'd help someone else):

df1.join(df2, on=['a'], how='inner').filter(df1.b == df2.b)

PS: both dataframes are too large to be broadcasted

tiho
  • 6,655
  • 3
  • 31
  • 31
  • Did you get an answer for this? – vikrant rana May 14 '19 at 07:48
  • 1
    @vikrantrana sadly no, I ended up doing it manually with the RDD API... – tiho May 16 '19 at 00:51
  • would you like to share us the code using rdd API or may be as an answer.. Thanks in advance – vikrant rana May 16 '19 at 04:32
  • 1
    @vikrantrana Well my code is very specific to my use case, as it is part of a long processing chain. This was almost one year ago too and I don't remember all details, but at high level I used the RDD API to join on 'a' and then processed each partition in pure Python to do the join on 'b' (I think with Pandas) -- partitions were small enough in my use case. – tiho May 21 '19 at 14:24
  • 1
    I also had some issue with joining and it did it via breaking the dataframe into chunks and repartition it and then process each partition serially in a loop. see link below https://stackoverflow.com/questions/53524062/efficient-pyspark-join/53720497#53720497 – vikrant rana May 21 '19 at 14:48

0 Answers0