28

I am going through Spark Programming guide that says:

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.

Considering the above, what are the use cases of broadcast variables? What problems do broadcast variables solve?

When we create any broadcast variable like below, the variable reference, here it is broadcastVar available in all the nodes in the cluster?

val broadcastVar = sc.broadcast(Array(1, 2, 3))

How long these variables available in the memory of the nodes?

Shaido
  • 27,497
  • 23
  • 70
  • 73

2 Answers2

35

If you have a huge array that is accessed from Spark Closures, for example, some reference data, this array will be shipped to each spark node with closure. For example, if you have 10 nodes cluster with 100 partitions (10 partitions per node), this Array will be distributed at least 100 times (10 times to each node).

If you use broadcast, it will be distributed once per node using an efficient p2p protocol.

val array: Array[Int] = ??? // some huge array
val broadcasted = sc.broadcast(array)

And some RDD

val rdd: RDD[Int] = ???

In this case, array will be shipped with closure each time

rdd.map(i => array.contains(i))

and with broadcast, you'll get a huge performance benefit

rdd.map(i => broadcasted.value.contains(i))
Ankur Chavda
  • 173
  • 4
  • 17
Eugene Zhulenev
  • 9,714
  • 2
  • 30
  • 40
  • 2
    Why a node's each partition needs to re-fetch the closure? Why not reuse it from the previous ? – Changming Sun Jun 23 '15 at 09:17
  • @JustinPihony Please could you provide a source? Shouldn't they just deprecate Broadcast variables if this is the case? I don't think what you say is true. – samthebest Dec 11 '15 at 16:06
  • 1
    @samthebest The documentation covers the case well IMO (second paragraph): https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables but if you need the source I can re-dig that up (I forgot) – Justin Pihony Dec 19 '15 at 05:17
  • 1
    @JustinPihony Well if you read the documentation it's pretty clear that it is NOT automatic. It automatically broadcasts data in serialized form, NOT deserialized form, for deserialized form you need to be explicit. If you have a job with 5000 tasks on 5 nodes, and you broadcast a 100 GB HashMap for use as a lookup, by using a BV you will save 1000x the cost of deserializing (and a lot of memory if you have turned off serialization compression). .... – samthebest Dec 19 '15 at 10:26
  • 1
    ..., the actual cost of deserialization itself is extremely memory intensive, I've seen various jobs fall over with OOMs (after some hanging) and reading the ST one can see it happens during some deserialization. Finally descoping 100GB of memory 1000 times in a single JVM is going to cause insane GC. Ergo "This is pretty much what Spark does now anyway... " is incorrect and BVs should still be used explicitly. – samthebest Dec 19 '15 at 10:29
  • 1
    Is it right to say that broadcasted value is similar to distributed cache of Hadoop? Only difference is that the cache contains files and broadcast variable is usually a computed value – Rakshith Apr 29 '16 at 04:36
0

Broadcast variables are used to send shared data (for example application configuration) across all nodes/executors.

The broadcast value will be cached in all the executors.

Sample scala code creating broadcast variable at driver:

val broadcastedConfig:Broadcast[Option[Config]] = sparkSession.sparkContext.broadcast(objectToBroadcast)

Sample scala code receiving broadcasted variable at executor side:

val config =  broadcastedConfig.value