5

I have a Spark program that reads a relatively big dataframe (~3.2 terabyte) that contains 2 columns: id, name and another relatively small dataframe (~20k entries) that contain a single column: id

What I'm trying to do is take both the id and the name from the big dataframe if they appear in the small dataframe

I was wondering what would be an efficient solution to get this working and why? Several options I had in mind:

  1. Broadcast join the 2 dataframes
  2. Broadcast the small dataframe and collect it as an array of strings and then filter on the big dataframe and use isin with the array of strings

Are there any other options that I didn't mention here?

I'll appreciate it if someone could also explain why a specific solution is more efficient than the other

Thanks in advance

Gideon
  • 2,211
  • 5
  • 29
  • 47
  • pls see http://stackoverflow.com/questions/40320441/difference-between-sc-broadcast-and-broadcast-function-in-spark-sql – Ram Ghadiyaram Nov 21 '16 at 13:32
  • @RamPrasadG so lets see I understand it correctly: if I choose option 1 it will broadcast the small table and then perform a hash join between the big dataframe and the small one while in option 2 it will broadcast the small dataframe but will run it using loop join? If thats correct it sounds like option 1 is better since the lookup will be faster, is that correct? – Gideon Nov 21 '16 at 13:39

1 Answers1

3

AFAIK its all depends on the size of data you are handling and performance ,

  • if you use broadcast function then default size is 10mb (for your small dataframe via spark.sql.autobroadcastjointhreshhold see my answer ) you can increase or decrease the size based on your data. Also, braodcasted data will be part of sql execution plan and further will be pointer to catalyst optimizer to do further optimization. Also see my answer here

  • where as broadcast shared variable (which you want to use isin) doesnt have above advantage.

pls see my answer in above link in my comment

Community
  • 1
  • 1
Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
  • 1
    option 1 has flexibility since if your dataset is not able to fit. then `canBroardcast` method in `SparkStrategies.scala`(from framework side) will apply different strategies for better performance. – Ram Ghadiyaram Nov 21 '16 at 13:55