8

I've read a lot about how to do efficient joins in pyspark. The ways to achieve efficient joins I've found are basically:

  • Use a broadcast join if you can. (I usually can't because the dataframes are too large)
  • Consider using a very large cluster. (I'd rather not because of $$$).
  • Use the same partitioner.

The last one is the one i'd rather try, but I can't find a way to do it in pyspark. I've tried:

df.repartition(numberOfPartitions,['parition_col1','partition_col2'])

but it doesn't help, it still takes way too long until I stop it, because spark get's stucked in the last few jobs.

So, how can I use the same partitioner in pyspark and speed up my joins, or even get rid of the shuffles that takes forever ? Which code do I need to use ?

PD: I've checked other articles, even on stackoverflow, but I still can't see code.

Manrique
  • 2,083
  • 3
  • 15
  • 38
  • have you checked whether the number of rows per partition is comparable? if not it could be that your calculation gets stucked in one particular partition, in which case a random shaffle for repartition would be more advisable. – Alessandro Nov 28 '18 at 16:53
  • Yes, I've done it. Pretty much balanced. – Manrique Nov 28 '18 at 17:59
  • 1
    @vikrantrana Hello ! Thank you so much for answering me. I'll try it if I need it, but I solved my problem in other way (because I figured out the problem was another). I'll answer myself in this post, so you can check it out, if you are curious. – Manrique Jan 10 '19 at 17:00

2 Answers2

11

you can also use a two-pass approach, in case it suits your requirement.First, re-partition the data and persist using partitioned tables (dataframe.write.partitionBy()). Then, join sub-partitions serially in a loop, "appending" to the same final result table. It was nicely explained by Sim. see link below

two pass approach to join big dataframes in pyspark

based on case explained above I was able to join sub-partitions serially in a loop and then persisting joined data to hive table.

Here is the code.

from pyspark.sql.functions import *
emp_df_1.withColumn("par_id",col('emp_id')%5).repartition(5, 'par_id').write.format('orc').partitionBy("par_id").saveAsTable("UDB.temptable_1")
emp_df_2.withColumn("par_id",col('emp_id')%5).repartition(5, 'par_id').write.format('orc').partitionBy("par_id").saveAsTable("UDB.temptable_2")

So, if you are joining on an integer emp_id, you can partition by the ID modulo some number and this way you can re distribute the load across the spark partitions and records having similar keys will be grouped together and reside on same partition. you can then read and loop through each sub partition data and join both the dataframes and persist them together.

counter =0;
paritioncount = 4;
while counter<=paritioncount:
    query1 ="SELECT * FROM UDB.temptable_1 where par_id={}".format(counter)
    query2 ="SELECT * FROM UDB.temptable_2 where par_id={}".format(counter)
    EMP_DF1 =spark.sql(query1)
    EMP_DF2 =spark.sql(query2)
    df1 = EMP_DF1.alias('df1')
    df2 = EMP_DF2.alias('df2')
    innerjoin_EMP = df1.join(df2, df1.emp_id == df2.emp_id,'inner').select('df1.*')
    innerjoin_EMP.show()
    innerjoin_EMP.write.format('orc').insertInto("UDB.temptable")
    counter = counter +1

I have tried this and this is working fine. This is just an example to demo the two-pass approach. your join conditions may vary and the number of partitions also depending on your data size.

vikrant rana
  • 4,509
  • 6
  • 32
  • 72
  • 1
    Hi Vikrant. I hope that you are fine. I read your answer and tried to implement it, but I have a few questions, if I may - 1. You say `similar records will be sharing same partition id on both the dataframes`, but on this [link](https://stackoverflow.com/questions/28395376/does-a-join-of-co-partitioned-rdds-cause-a-shuffle-in-apache-spark) Daniel Darabos in his answer says that `It's possible for two RDDs to have the same partitioner (be co-partitioned) yet have the corresponding partitions located on different nodes (not be co-located).`. Question is below - – cph_sto Jul 16 '19 at 13:03
  • 1
    For 2 `DFs` - Does it mean that a particular ID will always have a same partition number for both `DFs`, even though it is possible that these partitions may be different and on different machines? In other words, the two pass approach you described doesn't guarantee co-location? Please read the comment by Giorgio and Daniel's comments after that. In your case, even though the IDs are not co-located (no diff partitions), so all partition data for `DF1` may move to corresponding partitions of `DF2`, but this movement of data is not called `shuffle` and is not cost intensive? – cph_sto Jul 16 '19 at 13:15
  • 1
    No. It will not be copartitioned, it's just that you are breaking ur big dataframe into small chunks and within same dataframe you have grouped similar keys in one partition but other dataframe keys will be on different partition. – vikrant rana Jul 16 '19 at 13:29
  • 1
    Good to hear from you. I am doing fine as well. This partitioning thing is banging my head ;) May be I am understanding it now - You use `.partitionBy()` to break the data into chunks using `modulo` function, so all keys with same modulo value will be written (`.write.format...`) to the same folder, as described [here](https://stackoverflow.com/questions/40416357/spark-sql-difference-between-df-repartition-and-dataframewriter-partitionby) in the answer by conradlee. Then you import both the dataframes chunk wise, join them and insert them into main table, right? – cph_sto Jul 16 '19 at 14:48
  • Just one last question - While you were writing to `.saveAsTable()`, was it necessary to do repartition then? Could it not be done after you import the chunks, inside the `while` statement? Or does it make no difference? Just a small typo in your post, since counter is 0 with 5 partitions, so I think you can remove `=` sign in `while counter<=paritioncount:`, as modulo can only take `0,1,2,3,4` :) – cph_sto Jul 16 '19 at 15:02
  • Hi Vikrant, I don’t have LinkedIn account, but would add you if I create a one. Many thanks!! – cph_sto Jul 16 '19 at 17:56
  • @vikrantrana i think you need inline loop to iterate over the partition of the right side of time join operation – bib Jul 18 '19 at 13:14
  • 1
    @vikrantrana i think you need inline loop to iterate over the partition of the right side of the join operation.By the way i have two dataframe with one column id each and each has 3577 rows. I want to join both based on the condition id1!=id2. Normaly i should get 12794929, but using your approach i get 2584430. – bib Jul 18 '19 at 14:31
  • @@bib. not yet. I got stuck with other issues. currently. I will take a look into this. you are trying to perform the right join of two dataframes. right? – vikrant rana Jul 27 '19 at 13:01
4

Thank you @vikrantrana for your answer, I will try it if I ever need it. I say these because I found out the problem wasn't with the 'big' joins, the problem was the amount of calculations prior to the join. Imagine this scenario:

I read a table and I store in a dataframe, called df1. I read another table, and I store it in df2. Then, I perfome a huge amount of calculations and joins to both, and I end up with a join between df1 and df2. The problem here wasn't the size, the problem was spark's execution plan was huge and it couldn't maintain all the intermediate tables in memory, so it started to write to disk and it took so much time.

The solution that worked to me was to persist df1 and df2 in disk before the join (I also persisted other intermediate dataframes that were the result of big and complex calculations).

Manrique
  • 2,083
  • 3
  • 15
  • 38
  • 1
    I also had the same problem expensive transformations and it failed in the persist() step, couldn't even perform that one! please check here if you can help @manrique https://stackoverflow.com/questions/54653298/when-is-it-not-performance-practical-to-use-persist-on-a-spark-dataframe – SarahData Feb 15 '19 at 13:29