2

I have a global config Object in my spark app.

Object Config {
 var lambda = 0.01
}

and I will set the value of lambda according to user's input.

Object MyApp {
   def main(args: String[]) {
     Config.lambda = args(0).toDouble
     ...
     rdd.map(_ * Config.lambda)
   }
}

and I found that the modification does not take effect in executors. The value of lambda is always 0.01. I guess the modification in driver's jvm will not effect the executor's.

Do you have other solution ?

I found a similar question in stackoverflow :

how to set and get static variables from spark?

in @DanielL. 's answer, he gives three solutions:

  1. Put the value inside a closure to be serialized to the executors to perform a task.

But I wonder how to write the closure and how to serialized it to the executors, could any one give me some code example?

2.If the values are fixed or the configuration is available on the executor nodes (lives inside the jar, etc), then you can have a lazy val, guaranteeing initialization only once.

what if I declare the lambda as a lazy val variable? the modification in the driver will take effects in the executors? could you give me some code example?

3.Create a broadcast variable with the data. I know this way, but it also need a local Broadcast[] variable which wraps the Config Object right? for example:

val config = sc.broadcast(Config)

and use config.value.lambda in executors , right ?

Community
  • 1
  • 1
user2848932
  • 776
  • 1
  • 14
  • 28

1 Answers1

4
  1. Put the value inside a closure
object Config {var lambda = 0.01}
object SOTest {
  def main(args: Array[String]) {
    val sc = new SparkContext(new SparkConf().setAppName("StaticVar"))
    val r = sc.parallelize(1 to 10, 3)
    Config.lambda = 0.02
    mul(r).collect.foreach(println)
    sc.stop()
  }
  def mul(rdd: RDD[Int]) = {
    val l = Config.lambda
    rdd.map(_ * l)
  }
}
  1. lazy val for only once initialisation
object SOTest {
  def main(args: Array[String]) {
    lazy val lambda = args(0).toDouble
    val sc = new SparkContext(new SparkConf().setAppName("StaticVar"))
    val r = sc.parallelize(1 to 10, 3)
    r.map(_ * lambda).collect.foreach(println)
    sc.stop()
  }
}
  1. Create a broadcast variable with the data
object Config {var lambda = 0.01}
object SOTest {
  def main(args: Array[String]) {
    val sc = new SparkContext(new SparkConf().setAppName("StaticVar"))
    val r = sc.parallelize(1 to 10, 3)

    Config.lambda = 0.04
    val bc = sc.broadcast(Config.lambda)
    r.map(_ * bc.value).collect.foreach(println)

    sc.stop()
  }
}

Note: You shouldn't pass in the Config Object into sc.broadcast() directly, it would serialise your Config before transfer it to executors, however, your Config is not serialisable. Another thing to mention here: Broadcast variable do not fit well for your situation here, because you are only sharing a single value.

yjshen
  • 6,583
  • 3
  • 31
  • 40
  • thank you so much!!! what if my config class has too many variables? in the first solution, should I declare the Config as a Class and new a Config instance ? how to pass it to executors? – user2848932 May 12 '15 at 09:11
  • is this correct ? def mul(rdd: RDD[Int]) = { val l = new Config() l.lambda=0.02 rdd.map(_ * l.lambda) } – user2848932 May 12 '15 at 09:13
  • @user2848932, you should make your Config extends `Serializable` first if you want to do that – yjshen May 12 '15 at 09:21
  • after extends Serializable, my code will work, right? – user2848932 May 12 '15 at 09:41
  • Thanks @YijieShen. Could you elaborate more on [2]? Is lazy val initialized just once for the Spark app or once per executor or once per partition? Could you throw some more light on this? – Yash Feb 23 '18 at 19:21