4

I am trying to optimise my spark application job.

I tried to understand the points from this question: How to avoid shuffles while joining DataFrames on unique keys?

  1. I have made sure that the keys on which join operation has to happen are distributed within the same partition (using my custom partitioner).

  2. I also cannot do a broadcast join because my data may be come large depending on situation.

  3. In the answer of above mentioned question, repartitioning only optimises the join but What I need is join WITHOUT A SHUFFLE. I am just fine with the join operation with the help of keys within the partition.

Is it possible? I want to implement something like joinperpartition if similar functionality does not exists.

JohnAster
  • 307
  • 3
  • 13
  • you are joining 2 datasets together. does both datasets can't fit in memory? both are BIG? – Mahmoud Hanafy Dec 08 '18 at 09:21
  • DF or Rdd? If DF how so a custom partitioner? – thebluephantom Dec 08 '18 at 17:19
  • Both of my datasets can vary , either of them can vary in their content size . If DFs' have same partitioner , does join happen without shuffle ?? – JohnAster Dec 09 '18 at 12:41
  • @thebluephantom How can one apply custom partitioner to DF ? It can only be applied to PairRDD as I remember – JohnAster Dec 14 '18 at 09:44
  • That was exactly my point @JohnAster – thebluephantom Dec 14 '18 at 09:50
  • @thebluephantom Doubt : When i apply repartition to a DF , I have read that spark uses HashPartitioner to do this . But when I get output of df.rdd.partitioner, i get None . Do you know logic behind this behaviour ? – JohnAster Dec 14 '18 at 11:47
  • https://hackernoon.com/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4 You can also do repartitionByRange – thebluephantom Dec 14 '18 at 11:53
  • @JohnAster. You can also refer a link below for joining big dataframes iterating through partition data in sequence. https://stackoverflow.com/questions/53524062/efficient-pyspark-join/53720497#53720497 – vikrant rana Dec 14 '18 at 14:05

2 Answers2

4

repartitioning only optimises the join but What I need is join WITHOUT A SHUFFLE

This is not true. Repartition does not only "optimize" the join. Repartition binds a Partitioner to your RDD, which is the key component for a map side join.

I have made sure that the keys on which join operation has to happen are distributed within the same partition

Spark must know about this. Build your DataFrames with the appropriate api's so that they have the same Partitioner, and spark will take care of the rest.

shay__
  • 3,815
  • 17
  • 34
  • In my logic , I got my second dataframe after doing explode on arraytype columns and simple udfs . So their partitioner should be same by default . ( right ?) Since I can assure that there has been no shuffle , will spark do join without shuffle by itself ? Is that what you are implying sir ? – JohnAster Dec 09 '18 at 12:11
  • @JohnAster Not quite. A DataFrame does not have any partitioner by default, so even if one df is just a projection of another one it does not imply partitioner. You need to **make sure** your dataframes have the same partitioner, not just infer it. – shay__ Dec 09 '18 at 13:48
  • I found Bucket join https://issues.apache.org/jira/browse/SPARK-11512 . Is this what you are referring to ? – JohnAster Dec 10 '18 at 01:55
  • @JohnAster This is one of the techniques, yes. – shay__ Dec 10 '18 at 08:24
  • 1
    Although the strategy may be correct, it is not a good answer without an example, or at least references to the documentation on the "appropriate api's" that you mention. – Tony Nov 27 '19 at 14:20
4

just an addition to previously good answers. If you are joining a big dataframe multiple times throughout your pyspark application then save that table as bucketed tables and read them back in pyspark as dataframe. this way you can avoid multiple shuffles during join as data is already pre-shuffled and sorted.

so when Spark chooses sort merge join on two large dataframe, it will skip the sort and shuffle phase during your join operations. (you can confirm it in spark UI while looking at wholecodegen)

df_data_1.coalesce(1).write.format('orc').bucketBy(20, 'joincolumn').sortBy("sortcolumn").mode("overwrite").saveAsTable('bucketed_table1')

df_data_2.coalesce(1).write.format('orc').bucketBy(20, 'joincolumn').sortBy("sortcolumn").mode("overwrite").saveAsTable('bucketed_table2')

df_bucket_table_1 = spark.table("bucketed_table1");
df_bucket_table_2 = spark.table("bucketed_table2");

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.join.preferSortMergeJoin","true")

 #creating alias for the dataframes:
from pyspark.sql.functions import *

df1 = df_bucket_table_1.alias('df1')
df2 = df_bucket_table_2.alias('df2')


DfInnerJoin = df1.join(df2, df1.joincolumn == df2.joincolumn,'inner').select('df1.*')

The above join will have no shuffling but this is useful only when you have to join same dataframe multiple times throughout the application.

vikrant rana
  • 4,509
  • 6
  • 32
  • 72