0

I have a spark SQL query that goes like this -

SELECT /*+ BROADCASTJOIN (sbg_published.sk_e2e_web_all_vis) */
       a.* 
FROM 
       sbg_published.sk_e2e_web_all_vis a
LEFT JOIN 
       sbg_published.web_funnel_detail_v4 b
       ON a.col1 = b.col1

I am running this query using spark.sql() The first table has around 1 million records and the second has 1.5 billion records

I am trying to force spark to use broadcast join but instead, it is adopting sortmerge join.

Following are the spark params I have used

"spark.sql.autoBroadcastJoinThreshold" = "4048576000"
"spark.sql.broadcastTimeout" = "100000"
"spark.sql.shuffle.partitions" = 500
"spark.sql.adaptive.enabled" = "true"
"spark.sql.adaptive.coalescePartitions.enabled" = "true"
"spark.sql.adaptive.autoBroadcastJoinThreshold" ="4048576000"
"spark.sql.join.preferSortMergeJoin" = "false"
"spark.shuffle.io.maxRetries"="10"
"spark.dynamicAllocation.enabled"="true"
"spark.shuffle.service.enabled"="true"
"spark.shuffle.compress"="true"
"spark.shuffle.spill.compress"="true"
"spark.driver.maxResultSize"="0"

This is the DAG - enter image description here

I also then tried this parameter -

"spark.sql.join.preferSortMergeJoin" = "false"

This made the sortmerge join to go and adopted shuffle hash join instead.

I am using spark 3.2

Thanks in advance!

Sankar
  • 546
  • 4
  • 15

1 Answers1

0

Besides "spark.sql.autoBroadcastJoinThreshold" , spark has hard broadcast size limit 8G. You can't force spark to broadcast dataframe once it exceeds 8G. So you can try to resolve it by :

  1. Rewrite the sql to broadcast the small table.
  2. Rewrite the sql by union
Gary Li
  • 401
  • 5
  • 12