12

We use broadcast hash join in Spark when we have one dataframe small enough to get fit into memory. When the size of small dataframe is below spark.sql.autoBroadcastJoinThreshold I have few questions around this.

What is the life cycle of the small dataframe which we hint as broadcast? For how long it will remain in memory? How can we control it?

For example if I have joined a big dataframe with small dataframe two times using broadcast hash join. when first join performs it will broadcast the small dataframe to worker nodes and perform the join while avoiding shuffling of big dataframe data.

My question is that for how long will executor keep a copy of broadcast dataframe? Will it remain in memory till session ends? Or it will get cleared once we have taken any action. can we control or clear it? Or I am just thinking in wrong direction...

vikrant rana
  • 4,509
  • 6
  • 32
  • 72

3 Answers3

17

The answer to your question, at least in Spark 2.4.0, is that the dataframe will remain in memory on the driver process until the SparkContext is completed, that is, until your application ends.

Broadcast joins are in fact implemented using broadcast variables, but when using the DataFrame API you do not get access to the underling broadcast variable. Spark itself does not destroy this variable after it uses it internally, so it just stays around.

Specifically, if you look at the code of BroadcastExchangeExec (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala), you can see that it creates a private variable relationFuture which holds the Broadcast variable. This private variable is only used in this class. There is no way for you as a user to get access to it to call destroy on it, and nowhere in the curretn implementation does Spark call it for you.

Dave DeCaprio
  • 2,051
  • 17
  • 31
  • ..Thanks .. This one making some sense now. – vikrant rana Jan 22 '19 at 23:45
  • 1
    This Spark bug report has a good discussion of the issue. https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16750674#comment-16750674 – Dave DeCaprio Jan 24 '19 at 12:13
  • Thanks a lot.do you have any idea about iterative broadcast hash join? I was trying to code that.. not succeeded. – vikrant rana Jan 24 '19 at 16:51
  • 1
    I don't, although I may come back to this sometime next month and work on it again. For my use case we worked around the problem for now. – Dave DeCaprio Jan 25 '19 at 17:16
  • Thanks. Please let me know if you get any insight. I would be grateful to you. – vikrant rana Jan 26 '19 at 11:27
  • But this variable is one per Logical plan? and Broadcast use soft/weak reference. So I think JVM will GC broadcast val, when query is finished. Or How this private val can be reused between query plans? – Grigoriev Nick Sep 03 '21 at 10:12
2

The idea here is to create broadcast variable before join to easily control it. Without it you can't control these variables - spark do it for you.

Example:

from pyspark.sql.functions import broadcast

sdf2_bd = broadcast(sdf2)
sdf1.join(sdf2_bd, sdf1.id == sdf2_bd.id)

To all broadcast variables(automatically created in joins or created by hands) this rules are applied:

  1. The broadcast data is sent only to the nodes that contain an executor that needs it.
  2. The broadcast data is stored in memory. If not enough memory is available, the disk is used.
  3. When you are done with a broadcast variable, you should destroy it to release memory.
luminousmen
  • 1,971
  • 1
  • 18
  • 24
  • 3
    vikrant is talking about broadcast hint for joins not broadcast variables – Arnon Rotem-Gal-Oz Dec 14 '18 at 23:34
  • 1
    @ArnonRotem-Gal-Oz broadcast joins leverage broadcast variables underneath so same rules apply – Aravind Yarram Dec 15 '18 at 00:30
  • 2
    Yes except step 3 - you don't have a variable handle to destroy so you don't control its life cycle – Arnon Rotem-Gal-Oz Dec 15 '18 at 05:59
  • @Arnon. I agree with you – vikrant rana Dec 15 '18 at 06:40
  • @luminousmen.. could you please elaborate more on your third point.. can you share an example for this? – vikrant rana Dec 15 '18 at 07:01
  • @luminousmen - broadcast on a dataframe is a hint it is not like a variable broadcast see https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala – Arnon Rotem-Gal-Oz Dec 15 '18 at 09:25
  • @luminousmen... Let's assume my smaller dataframe has four columns and have broadcast this dataframe to nodes.. in order to control it's life-cycle.. do I need to destroy them separately as a broadcast variable??? – vikrant rana Dec 16 '18 at 05:46
  • @Arnon Rotem-Gal-Oz yes, variable marks as small enough for use in broadcast joins, but you can manage it anyway(doesn't matter how spark create logical/physical plan internally) – luminousmen Dec 16 '18 at 15:00
  • @vikrantrana if you want to remove the broadcast variable from both executors and driver you have to use destroy. Also, you can use unpersist - it only removes broadcast variables from the executors – luminousmen Dec 16 '18 at 15:05
  • 2
    @luminousmen you're wrong - calling unpersist() on sdf2_bd (in your example) has no effect because it isn't persisted in the dataframe sense (you can make it persistent- you can call sdf2_bd.cache() twice and you'd see that the second time will give you a warning that it is already cached (but not the first time). calling unpersist on the dataframe releases this. when you mark a dataframe with broadcast it only marks it with a hint which is resolved when spark builds the join. but you can't control the lifecycle – Arnon Rotem-Gal-Oz Dec 16 '18 at 15:22
  • I was trying to perform the iterative broadcast join. What is being discussed here(see link below) https://databricks.com/session/working-with-skewed-data-the-iterative-broadcast... I can perform join in loops.. just wondering that how can I clear the dataframe which had already been broadcast..before I broadcast the next one.. – vikrant rana Dec 17 '18 at 17:06
  • @Arnon Rotem-Gal-Oz- Do you have any clue about implementing a iterative broadcast join? I was able to do a sort merge join in Loop and it works.https://stackoverflow.com/questions/53524062/efficient-pyspark-join/53720497#53720497 – vikrant rana Dec 28 '18 at 12:45
2

Here are some additional findings after some research I did regarding the options on broadcasting.

Let's consider the next example:

import org.apache.spark.sql.functions.{lit, broadcast}

val data = Seq(
(2010, 5, 10, 1520, 1),
(2010, 5, 1, 1520, 1),
(2011, 11, 25, 1200, 2),
(2011, 11, 25, 1200, 1),
(2012, 6, 10, 500, 2),
(2011, 11, 5, 1200, 1),
(2012, 6, 1, 500, 2),
(2011, 11, 2, 200, 2))

val bigDF = data
            .toDF("Year", "Month", "Day", "SalesAmount", "StoreNumber")
            .select("Year", "Month", "Day", "SalesAmount")

val smallDF = data
            .toDF("Year", "Month", "Day", "SalesAmount", "StoreNumber")
            .where($"Year" === lit(2011))
            .select("Year", "Month", "Day", "StoreNumber")

val partitionKey = Seq("Year", "Month", "Day")
val broadcastedDF = broadcast(smallDF)
val joinedDF = bigDF.join(broadcastedDF, partitionKey)

As expected the execution plan for the joinedDF should look as the next one:

== Physical Plan ==
*(1) Project [Year#107, Month#108, Day#109, SalesAmount#110, StoreNumber#136]
+- *(1) BroadcastHashJoin [Year#107, Month#108, Day#109], [Year#132, Month#133, Day#134], Inner, BuildRight, false
   :- LocalTableScan [Year#107, Month#108, Day#109, SalesAmount#110]
   +- BroadcastExchange HashedRelationBroadcastMode(ArrayBuffer(input[0, int, false], input[1, int, false], input[2, int, false]))
      +- LocalTableScan [Year#132, Month#133, Day#134, StoreNumber#136]

Which would be probably the same without the explicit broadcast as well since the smallDF is quite small and it will fit to the default broadcast size (10MB).

Now, I would expect that I would be able to access the broadcasted dataframe from the dependencies of joinedDF hence I try to access the broadcast df by printing out the rdd.id for all the dependencies of joinedDF and the broadcastedDF through a helper function:

import org.apache.spark.rdd._

def printDependency(rdd : RDD[_], indentation: String = "") : Unit = {
      if (rdd == null)
        return;

      println(s"$indentation Partition Id: ${rdd.id} ")
      rdd.dependencies.foreach { d => printDependency(d.rdd, s"$indentation ")}
}

println(s"Broadcasted id: ${broadcastedDF.rdd.id}")
printDependency(joinedDF.rdd)

//Output
//
// Broadcasted id: 164
//
// Partition Id: 169 
//   Partition Id: 168 
//    Partition Id: 167 
//     Partition Id: 166 
//      Partition Id: 165

Surprisingly I realized that the broadcasted dataframe is not included/considered a part of the DAG for the joinedDF, which make sense since once we broadcasted the instance of the smallDF we don't want to trace its changes any more and of course Spark is aware of that.

One way of freeing a broadcast dataset is by using unpersist as shown below:

val broadcastedDF = smallDF.hint("broadcast")
val joinedDF = bigDF.join(broadcastedDF, partitionKey)

broadcastedDF.unpersist()

A second way is by working with the sparkContext API directly, as shown below:

val broadcastedDF = spark.sparkContext.broadcast(smallDF)
val joinedDF = bigDF.join(broadcastedDF.value, partitionKey)
broadcastedDF.destroy() // or unpersist for async

Although this will delete the broadcast instance itself and not the underlying smallDF. The last one will be marked for deletion and not removed immediately depending if there are additional references on it. This will work in combination with ContextCleaner class and more specifically will be controlled by the keepCleaning method which tries to remove RDDs, Accumulators, Shuffles and Checkpoints that are not needed any more asynchronously during the program execution or when the context ends (as already mentioned).

The second way (and the safer in my opinion) to remove the dependencies of joinedDF that are not longer used is through the methods df.persist(), df.checkpoint(), rdd.persist() and rdd.checkpoint(). All the mentioned methods will end up calling registerRDDForCleanup or registerForCleanup methods of the ContextCleaner class in order to clean up their parent dependencies.

One obvious question that occurs is which one to use and what are differences? There are two main differences, first with checkpoint() you can reuse the output data in a second job by loading the data from the same checkpoint directory. And secondly, the dataframe API will apply additional optimizations when saving the data, there is no such a functionality available in the RDD API.

So the final conclusion is, you can prune the data of the ancestors of your RDDs by calling one of the df.persist(), df.checkpoint, rdd.persist() and rdd.checkpoint() . The pruning will occur during the job execution and not just when the context will be terminated. Last but not least, you should not forget that the all the previous methods will be evaluated lazily and therefore take place only after executing an action.

UPDATE:

It seems that the most efficient way to force freeing memory right away for dataframes/RDDs is calling unpersist as discussed here. The code then would slightly change to:

val broadcastedDF = smallDF.hint("broadcast")
val joinedDF = bigDF.join(broadcastedDF, partitionKey)
broadcastedDF.unpersist()
abiratsis
  • 7,051
  • 3
  • 28
  • 46
  • below is the link for reference.. sound intresting....https://databricks.com/session/working-with-skewed-data-the-iterative-broadcast – vikrant rana May 12 '19 at 00:08
  • 1
    I checked the mentioned video Vikrant although it seems to me that here https://stackoverflow.com/questions/53524062/efficient-pyspark-join/53720497# you have already implemented a similar scenario with the difference that you must broadcast the data from EMP_DF2 based on par_id and then join it with innerjoin_EMP. Also the iterative broadcast will be appropriate for data skew scenarios, I believe that it wouldnt be efficient in all the cases since you must use the disk storage and replace the default join behavior – abiratsis May 12 '19 at 10:18