6

As far as I know when Spark performs broadcast join it firstly collects smallest (broadcast) RDD to driver to make a broadcast variable from it, and only then uploads it to each target node.

Sometimes it leads to driver memory outflows if broadcasting RDD > spark.driver.memory.

The question: why it works in such way? It is more efficient to just shuffle broadcast data between target nodes, because amount of data to shuffle is the same but we can avoid driver overflow.

Example: Say you have 3 nodes and 1 gb of data to broadcast on each node and each node have 1gb/s throughput.

Spark approach - each node have to upload to driver its piece of data (1gb) and download broadcast variable (3 * 1g = 3gb), so each node should transfer 4 gb total and it takes 4s.

Shuffle approach - one node have to upload 1gb to 2 other nodes and download 1gb from these. Again, total amount is 4 gb and it takes 4s.

Lars Triers
  • 167
  • 1
  • 7
  • 1
    How can you be sure that the memory overflow comes from that ? – eliasah Oct 28 '16 at 11:19
  • 1
    @eliasah because I saw out of memory overflows on driver many times and the stack trace obviously tells that the reason is creating broadcast variable from broadcasting RDD. The only thing that helps is increasing of spark.driver.memory – Lars Triers Oct 28 '16 at 11:59
  • 1
    there is lots of reasons for OOME we can't narrow down unless we really seem "some code" – eliasah Oct 28 '16 at 12:00
  • 1
    @eliasah It's easy to reproduce: set spark.driver.memory to 1g and then try to: df1.join(broadcast(df2)) where df1 is 100 gb and df2 is 2 gb. You'll get OOM on driver 100% – Lars Triers Oct 28 '16 at 12:02
  • 1
    you don't need to broadcast a big DataFrame. Have you read this http://stackoverflow.com/a/35257145/3415409 ? – eliasah Oct 28 '16 at 12:04
  • 1
    @eliasah I don't get your point. I am using broadcast(..) function in absolutely correct way: df1(broadcast(df2)), it is actually the suggested way in comment you provide me. – Lars Triers Oct 28 '16 at 12:11
  • 1
    @eliasah and actually I am broadcasting smallest df2 which is as small as 1/50 of big one (df1) – Lars Triers Oct 28 '16 at 12:12
  • Possible duplicate of [How to transform RDD, Dataframe or Dataset straight to a Broadcast variable without collect?](http://stackoverflow.com/questions/38329738/how-to-transform-rdd-dataframe-or-dataset-straight-to-a-broadcast-variable-with) –  Oct 28 '16 at 14:36

3 Answers3

0

Firstly broadcast join is used for joining a big table and an extremely small table.

Then if using shuffle instead of collecting the small df(table) back to driver and then broadcast, you only noticed that the small df is shuffled, but actually the big df is also shuffled at the same time, which is quite time consuming.

Kevin
  • 231
  • 2
  • 13
0

"It is more efficient to just shuffle broadcast data between target nodes, because amount of data to shuffle is the same but we can avoid driver overflow. -- that right, spark team is working on that: https://issues.apache.org/jira/browse/SPARK-17556 "Currently in Spark SQL, in order to perform a broadcast join, the driver must collect the result of an RDD and then broadcast it. This introduces some extra latency. It might be possible to broadcast directly from executors."

fjolt
  • 341
  • 3
  • 3
-2

It is not correct. Spark doesn't use broadcasting for RDD joins.

Spark may use broadcasting for DataFrame joins but it shouldn't be used to handle large objects. It is better to use standard HashJoin for that.