9

Is there any way (or any plans) to be able to turn Spark distributed collections (RDDs, Dataframe or Datasets) directly into Broadcast variables without the need for a collect? The public API doesn't seem to have anything "out of box", but can something be done at a lower level?

I can imagine there is some 2x speedup potential (or more?) for these kind of operations. To explain what I mean in detail let's work through an example:

val myUberMap: Broadcast[Map[String, String]] =
  sc.broadcast(myStringPairRdd.collect().toMap)

someOtherRdd.map(someCodeUsingTheUberMap)

This causes all the data to be collected to the driver, then the data is broadcasted. This means the data is sent over the network essentially twice.

What would be nice is something like this:

val myUberMap: Broadcast[Map[String, String]] =
  myStringPairRdd.toBroadcast((a: Array[(String, String)]) => a.toMap)

someOtherRdd.map(someCodeUsingTheUberMap)

Here Spark could bypass collecting the data altogether and just move the data between the nodes.

BONUS

Furthermore, there could be a Monoid-like API (a bit like combineByKey) for situations where the .toMap or whatever operation on Array[T] is expensive, but can possibly be done in parallel. E.g. constructing certain Trie structures can be expensive, this kind of functionality could result in awesome scope for algorithm design. This CPU activity can also be run while the IO is running too - while the current broadcast mechanism is blocking (i.e. all IO, then all CPU, then all IO again).

CLARIFICATION

Joining is not (main) use case here, it can be assumed that I sparsely use the broadcasted data structure. For example the keys in someOtherRdd by no means covers the keys in myUberMap but I don't know which keys I need until I traverse someOtherRdd AND suppose I use myUberMap multiple times.

I know that all sounds a bit vague, but the point is for more general machine learning algorithm design.

Community
  • 1
  • 1
samthebest
  • 30,803
  • 25
  • 102
  • 142

2 Answers2

6

While theoretically this is an interesting idea I will argue that although theoretically possible it has very limited practical applications. Obviously I cannot speak for PMC so I cannot say if there are any plans to implement this type of broadcasting mechanism at all.

Possible implementation:

Since Spark already provides torrent broadcasting mechanism which behavior is described as follows:

The driver divides the serialized object into small chunks and stores those chunks in the BlockManager of the driver.

On each executor, the executor first attempts to fetch the object from its BlockManager. If it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or other executors if available.

Once it gets the chunks, it puts the chunks in its own BlockManager, ready for other executors to fetch from.

it should be possible to reuse the same mechanism for direct node-to-node broadcasting.

It is worth noting that this approach cannot completely eliminate driver communication. Even though blocks could be created locally you still need a single source of truth to advertise a set of blocks to fetch.

Limited applications

One problem with broadcast variables is that there are quite expensive. Even if you can eliminate driver bottleneck two problems remain:

  • Memory required to store deserialized object on each executor.
  • Cost of transferring broadcasted data to every executor.

The first problem should be relatively obvious. It is not only about direct memory usage but also about GC cost and its effect on overall latency. The second one is rather subtle. I partially covered this in my answer to Why my BroadcastHashJoin is slower than ShuffledHashJoin in Spark but let's discus this further.

From network traffic perspective broadcasting a whole dataset is pretty much equivalent to creating Cartesian product. So if dataset is large enough for driver becoming a bottleneck it is unlikely to be a good candidate for broadcasting and targeted approach like hash join can be preferred in practice.

Alternatives:

There are some methods which can be used to achieve similar results as direct broadcast and address issues enumerated above including:

  • Passing data via distributed file system.
  • Using replicated database collocated with worker nodes.
Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • I've given an upvote for pointing out indeed for regular joins a shuffle join is often better. But not accepting since my use case is more general than that - I didn't say I only wanted to do regular single joins/cartesians. I guess since that's what 99% of people do with broadcasts it was a fair assumption. I've updated my OP to be more clear. Thanks. – samthebest Jul 30 '16 at 08:45
  • Oh, I get that. Thing is that as long as we don't use off-heap structures GC will eat us alive much faster than network traffic. Or at least this is what I've seen so far. And if we start to tune for very large objects then we get performance hit on the rest. So the only application I can think off is small objects and tunning for close-to-real-time processing. But not streaming because we cannot elegantly destroy and rebroadcast. – zero323 Jul 30 '16 at 10:33
  • I do think it would be useful to be able to instantiate broadcast data from the executors directly. It would give flexibility. For example, if a broadcast variable was the result of querying an external jdbc data source that was very slow. One could use the parallel load capability of a spark cluster to instantiate multiple queries on the source server, which could run in parallel threads. – ThatDataGuy Oct 31 '16 at 14:48
  • @zero323 Awesome answer, still curious about the _advantage_ of going through driver. So the savings might be limited if you bypass driver, but is there any advantage to going through driver? – allstar Dec 05 '19 at 18:19
0

I don't know if we can do it for RDD but you can do it for Dataframe

import org.apache.spark.sql.functions

val df:DataFrame = your_data_frame

val broadcasted_df = functions.broadcast(df)

now you can use variable broadcasted_df and it will be broadcasted to executor.

Make sure broadcasted_df dataframe is not too big and can be send to executor.

broadcasted_df will be broadcaster in operations like for example

other_df.join(broadcasted_df)

and in this case join() operation executes faster because every executor has 1 partition of other_df and whole broadcasted_df

For your question i am not sure you can do what you want. You can not use one rdd inside #map() method of another rdd because spark doesn't allowed transformations inside transformations. And in your case you need to call collect() method to create map from your RDD because you can only use usual map object inside #map() method you can not use RDD there.

  • I don't think this actually causes the dataframe to be distributed without a collect "under the hood" (though that's a hunch). Rather it marks the dataframe for broadcast if we use it for join. It's worth noting that joining is not (main) use case here, which I will make clear in my OP by editing. – samthebest Jul 30 '16 at 08:35
  • Not a hunch: it collects `DataFrame` without converting to local types and broadcasts it back. I am pretty sure have an answer that describes this somewhere here but it could be on chat. One way or another it just hides `collect`. – zero323 Jul 30 '16 at 10:09