1

I have a small table (2k ) records and big table (5 mil) records.I need to fetch all data from small tables and only matching data from large table so to achieve this I have executed below query select /*+ broadcast(small)*/ small.* From small left outer join large Though the query return correct result but when I check the query plan it shows sort merged broadcast hash join. Is there any limitations if small table is left table we can't broadcast and what's the way out then.

S Ganguly
  • 49
  • 2
  • 4

2 Answers2

7

Since you're looking to select the entire dataset from a small table rather than a large one, Spark won't enforce a broadcast join. However, if you change the join sequence or convert to an equi-join, Spark will happily enable a broadcast join.

Eg:

  1. Big-Table left outer join Small-Table -- Broadcast Enabled
  2. Small-Table left outer join Big-Table -- Broadcast Disabled

Reason: The reason for this is that Spark shares the small table (also known as the broadcast table) to all data nodes where the big table data is present. In your case, you need all the data from the small table but only the matching data from the big table. Spark cannot determine whether a particular record was matched at another data node or if there was no match at all, so there is ambiguity when selecting all the records from the small table if it were distributed. As a result, Spark won't use broadcast join in this scenario.

Community
  • 1
  • 1
Krishna G
  • 86
  • 1
  • 7
2

Change the order of the tables as you are doing left join by broadcasting left table, so right table to be broadcasted (or) change the join type to right.

select /*+ broadcast(small)*/ small.* From small right outer join large
select /*+ broadcast(small)*/ small.* From large left outer join small

Example:

df=spark.createDataFrame([(1,'a')],['id','name'])
df1=spark.createDataFrame([(1,'a')],['id','name'])

#broadcasting on right df1 and performing left join
df.join(broadcast(df1),['id'],'left').explain()
#== Physical Plan ==
#*(2) Project [id#0L, name#1, name#5]
#+- *(2) BroadcastHashJoin [id#0L], [id#4L], LeftOuter, BuildRight
#   :- Scan ExistingRDD[id#0L,name#1]
#   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
#      +- *(1) Filter isnotnull(id#4L)
#         +- Scan ExistingRDD[id#4L,name#5]


#broadcasting df1 and right join defaults to Sortmerge join
df.join(broadcast(df1),['id'],'right').explain()
#== Physical Plan ==
#*(4) Project [id#4L, name#1, name#5]
#+- SortMergeJoin [id#0L], [id#4L], RightOuter
#   :- *(2) Sort [id#0L ASC NULLS FIRST], false, 0
#   :  +- Exchange hashpartitioning(id#0L, 200)
#   :     +- *(1) Filter isnotnull(id#0L)
#   :        +- Scan ExistingRDD[id#0L,name#1]
#   +- *(3) Sort [id#4L ASC NULLS FIRST], false, 0
#      +- Exchange hashpartitioning(id#4L, 200)
#         +- Scan ExistingRDD[id#4L,name#5]
notNull
  • 30,258
  • 4
  • 35
  • 50
  • Thanks @Shu for answer.but in my scenario I need all data from small tables.But the join type you mention giving me all data from large table.which I don't want.I want full data from small tables and only matching data from large table also I want to broadcast so there should not be any shuffle is there any way out. – S Ganguly Jul 05 '20 at 10:39
  • You can increase the `spark.sql.autoBroadcastJoinThreshold` to your big table size by default its `10MB` then broadcasthashjoin will be performed! – notNull Jul 05 '20 at 14:06
  • Thanks again but that's also not possible as my big size is more than 10gb which I don't think supported by spark to broadcast. Please let me know if there is any other way out. – S Ganguly Jul 05 '20 at 15:32