5

In Spark, we use broadcast variable to make each machine have read only copy of a variable. We usually create a broadcast variable outside closure (Such as a look up table needed by the closure) to improve performance.

We also have a spark transformation operator called mapPartitions, which tried to achieve the same thing (Use shared variable to improve performance). For example, in mapPartitions we can shared a database connection for each partitions.

So what's the difference between these two? Can we use it interchangebly just for shared variables?

xuanyue
  • 1,368
  • 1
  • 17
  • 36

2 Answers2

6

broadcast is used to ship the object to every worker node. This object is going to be shared among all partitions on that node (and the value/i.e. object is the same for every node in the cluster). The goal of broadcasting is to save on network costs when you use the same data in many different tasks/partitions on the worker node.

mapPartitions in contrast, is a method available on RDDs, and works like map, only on partitions. Yes, you can define new objects, such as a jdbc connection, which will then be unique to each partition. However, you can't share it among different partitions, and much less among different nodes.

KrisP
  • 1,206
  • 8
  • 10
  • I see so the main difference is the level right? broadcast is at node level but mapPartitions is at partitions level. – xuanyue Dec 28 '15 at 22:29
  • 2
    In a sense, yes. However, the usage (in particular, the syntax) is so different that I would hesitate to draw a parallel between the two cases. Usually you broadcast an existing Array of data, but in mapPartitions you create a new object at that level. Btw, broadcast is at the cluster level, not node level. – KrisP Dec 29 '15 at 01:32
4

While the answer provided by KrisP highlights all the important differences I think it is worth noting that mapPartitions is just a low level building block behind higher level transformations not a method to achieve shared state.

Although mapPartitions can be used to make shared-liked state explicit it technically not shared (its lifetime is limited to mapPartitions closure`) and there are other means to achieve it. In particular, variables which are referenced inside closures are shared inside a partition. To illustrate that lets play a little with singletons:

object DummySharedState {
  var i = 0L
  def get(x: Any) =  {
    i += 1L
    i
  }
}

sc.parallelize(1 to 100, 1).map(DummySharedState.get).max
// res3: Long = 100
sc.parallelize(1 to 100, 2).map(DummySharedState.get).max
// res4: Long = 50
sc.parallelize(1 to 100, 50).map(DummySharedState.get).max
// res5: Long = 2

and a similar thing in PySpark:

  • singleton module dummy_shared_state.py:

    i = 0
    def get(x):
        global i
        i += 1
        return i
    
  • main script:

    from pyspark import SparkConf, SparkContext
    import dummy_shared_state
    
    master = "spark://..."
    conf = (SparkConf()
        .setMaster(master)
        .set("spark.python.worker.reuse", "false"))
    
    sc.addPyFile("dummy_shared_state.py")
    sc.parallelize(range(100), 1).map(dummy_shared_state.get).max()
    ## 100
    sc.parallelize(range(100), 2).map(dummy_shared_state.get).max()
    ## 50 
    

Please note that spark.python.worker.reuse option is set to false. If you keep default value you'll actually see something like this:

sc.parallelize(range(100), 2).map(dummy_shared_state.get).max()
## 50
sc.parallelize(range(100), 2).map(dummy_shared_state.get).max()
## 100
sc.parallelize(range(100), 2).map(dummy_shared_state.get).max()
## 150

At the end of the day you have to distinguish between three different things:

  • broadcast variables which are designed to reduce network traffic an memory footprint by keeping a copy of the variable on the worker instead of shipping it with each task
  • variables defined outside closure and referenced inside closure which has to be shipped with each task and are shared for this task
  • variables defined inside closure which are not shared

On top of that there are some Python specific gotchas related to the usage of persistent interpreters.

Still there is no practical difference between map (filter or other transformation) and mapPartitions when it comes to variable lifetime.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • This is really helpful. I'm wondering about another difference between map and mapPartitions. map operates on a single row basis, while mapPartitions has access to all the rows sent to that partition through an iterator. Is it possible to have the advantage of access to global variables as per your example above and also all the rows sent to the partition? I assume mapPartition can't use global scope variables as you've shown here. – retrocookie Jan 07 '16 at 06:12
  • @retrocookie `map` is implemented using `mapPartitions` so there is no difference here. I would be careful though. It is more to show what are the implications of keeping persistent interpreter than anything else. – zero323 Jan 07 '16 at 11:45
  • So are globals accessed via mapPartition shared at the node level as opposed to the partition level? – retrocookie Jan 07 '16 at 16:02
  • It is a little bit more complicated than that. If you ask about PySpark then the answer is negative. Worker is a JVM process and it communicates with Python executors using sockets. So there is not global Python out there. – zero323 Jan 07 '16 at 16:11
  • So with default options there's one python interpreter per executor which could be used for multiple partitions right? – retrocookie Jan 07 '16 at 16:16
  • I don't think so. AFAIK each partition should get its own interpreter. It is only reused between tasks. But I could be wrong. One way or another you shouldn't depend on a details of implementation. The only reason I pointed this answer is to show that there is no excessive environment spawning when you import `rpy` :) – zero323 Jan 07 '16 at 16:27
  • The reason I'm so curious is for the use case where you have large objects (possibly R objects) that differ by partition that you want to be used for multiple tasks, global variables and reusing interpreters seems the right way to go. – retrocookie Jan 07 '16 at 17:09
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/100075/discussion-between-zero323-and-retrocookie). – zero323 Jan 07 '16 at 17:10