0

I am trying to do a broadcast join on two tables. The size of the smaller table will vary based upon the parameters but the size of the larger table is close to 2TB.

What I have noticed is that if I don't set the spark.sql.autoBroadcastJoinThreshold to 10G some of these operations do a SortMergeJoin instead of a broadcast join. But the size of the smaller table shouldn't be this big at all. I wrote the smaller table to a s3 folder and it took only 12.6 MB of space.

I did some operations on the smaller table so the shuffle size appears on the Spark History Server and the size in memory seemed to be 150 MB, nowhere near 10G. Also, if I force a broadcast join on the smaller table it takes a long time to broadcast, leading me to think that the table might not just be 150 MB in size.

What would be a good way to figure out the actual size that Spark is seeing and deciding whether it crosses the value defined by spark.sql.autoBroadcastJoinThreshold?

Shaido
  • 27,497
  • 23
  • 70
  • 73
Venkat Dabri
  • 31
  • 2
  • 6

2 Answers2

0

look at the SQL tab in the spark UI. there you will see the DAG of each job + the statistics that spark collects.
for each dataframe, it will contain the size as spark sees it.

BTW, you don't have set spark.sql.autoBroadcastJoinThreshold to a high number to force spark using the broadcast join.
you can simply wrap the small df with org.apache.spark.sql.functions.broadcast(df) and it will force broadcast only on that specific join

lev
  • 3,986
  • 4
  • 33
  • 46
0

As mentioned in this question: DataFrame join optimization - Broadcast Hash Join

import org.apache.spark.sql.functions.broadcast  

val employeesDF = employeesRDD.toDF
val departmentsDF = departmentsRDD.toDF

// materializing the department data
val tmpDepartments = broadcast(departmentsDF.as("departments"))

import context.implicits._

employeesDF.join(broadcast(tmpDepartments), 
   $"depId" === $"id",  // join by employees.depID == departments.id 
   "inner").show()
Sanchit Grover
  • 998
  • 1
  • 6
  • 9