9

I have a class as this:

public class Test {
    private static String name;

    public static String getName() {
        return name;
    }

    public static void setName(String name) {
        Test.name = name;
    }

    public static void print() {
        System.out.println(name);
    }

}

Inside my Spark driver, I'm setting the name like this and calling the print() command:

public final class TestDriver{

    public static void main(String[] args) throws Exception {
        SparkConf sparkConf = new SparkConf().setAppName("TestApp");
        // ...
        // ...
        Test.setName("TestName")
        Test.print();
        // ...
    }
}

However, I'm getting a NullPointerException. How do I pass a value to the global variable and use it?

vaquar khan
  • 10,864
  • 5
  • 72
  • 96
diplomaticguru
  • 675
  • 3
  • 8
  • 19
  • From your code this is completely unrelated to Spark. The master is a program like any other and I don't see the problem. It has to be on some of the omitted code. – Daniel Langdon Apr 16 '15 at 20:53
  • @DanielL, I tried this on the standard java application and it worked, therefore, I assume the problem is with the parallelization of the tasks and distribution behavior of Spark framework. – diplomaticguru Apr 16 '15 at 21:52
  • Can you put the entire stack trace? What I'm saying is that the problem is probably in a line you didn't put above... – Daniel Langdon Apr 16 '15 at 22:29
  • With "Global Variable" do you mean some value set once on the driver and used across all workers? or some mutable shared structure that is set and updated as the job progresses? – maasg Apr 17 '15 at 08:45
  • Also note that your example might not be a minimal set reproducing the issue. Most probably, the `Test.print()` is being called within a closure for some Spark operation. – maasg Apr 17 '15 at 08:46
  • Why you need static variables into Spark , could you please explain your usecase ? – vaquar khan Sep 07 '20 at 04:37

4 Answers4

25

Ok, there is basically 2 ways to take a value known to the master to the executors:

  1. Put the value inside a closure to be serialized to the executors to perform a task. This is the most common one and very simple/elegant. Sample and doc here.
  2. Create a broadcast variable with the data. This is good for immutable data of a big size, so you want to guarantee it is send only once. Also good if the same data is used over and over. Sample and doc here.

No need to use static variables in either case. But, if you DO want to have static values available on your executor VMs, you need to do one of these:

  1. If the values are fixed or the configuration is available on the executor nodes (lives inside the jar, etc), then you can have a lazy val, guaranteeing initialization only once.
  2. You can call mapPartitions() with code that uses one of the 2 options above, then store the values on your static variable/object. mapPartitions is guaranteed to run only once for each partition (much better than once per line) and is good for this kind of thing (initializing DB connections, etc).

Hope this helps!

P.S: As for you exception: I just don't see it on that code sample, my bet is that it is occurring elsewhere.


Edit for extra clarification: The lazy val solution is simply Scala, no Spark involved...

object MyStaticObject
{
  lazy val MyStaticValue = {
     // Call a database, read a file included in the Jar, do expensive initialization computation, etc
     4
  }
} 

Since each Executor corresponds to a JVM, once the classes are loaded MyStaticObject will be initialized. The lazy keyword guarantees that the MyStaticValue variable will only be initialized the first time it is actually requested, and hold its value ever since.

Daniel Langdon
  • 5,899
  • 4
  • 28
  • 48
  • DanielL - You are a star! Thank you for your well explained options. This is exactly what I was after. Many thanks :-) – diplomaticguru Apr 17 '15 at 16:13
  • @DanielL. could you please give an example code for your first solution? and could you give an example about lazy val solution – user2848932 May 11 '15 at 14:28
  • Made an edit adding links to the right documentation with examples and added the lazy val example at the end. Cheers! – Daniel Langdon May 11 '15 at 14:56
  • I might be a bit late but… is there any difference between using a `lazy val` and `mapPartitions` in terms of the number of times the initialization code is called? For `mapPartitions` it is clear that it's called for each partition but what about the `lazy val` cased? Is it still called once per partition or, rather, once per executor? – Alberto Aug 16 '16 at 11:09
  • the later. Depending on input size and config, you might have say 100 tasks and 5 executors, which means you will initialize 20 times more than strictly required. Nevertheless, `mapPartitions` is still a good answer if you need to build the right lambda from master, static init is an issue, etc. – Daniel Langdon Aug 17 '16 at 14:57
  • Hi Daniel, I am running a spark streaming application where on executor based on RDD data I do some DB operation like writing to DB and reading from DB using a client/executor side DB library. client/executor side DB library claims that it does client/executor side caching. I want your opinion, if this cache will be long-lived on executor for a given spark application, meaning different jobs/batches running at different times on this executor (assume they all belong to same spark application) can use/update this cache? – sunillp Oct 21 '16 at 06:02
  • We do see that a executor is making just one connection to the DB across different jobs/batches of a spark application running on that executor. – sunillp Oct 21 '16 at 06:02
  • That would depend entirely on the internal implementation of you DB library and how often it creates new connections, but your should at least expect one per executor, as those are going to be be independent JVMs. – Daniel Langdon Dec 20 '16 at 18:00
  • @DanielLangdon: Is `mapPartitions` good for Spark Streaming? It would start creating DB connection(s) for each mini-batch. – CᴴᴀZ Dec 29 '16 at 08:31
  • Not sure. You might want to try a static connection pool for that, depending on batch size. – Daniel Langdon Jan 03 '17 at 14:26
  • How can i pass the broadcast variable to another class like Function class.? – junsid Apr 06 '17 at 12:15
  • mapPartitions() may not executed on all executors, so static var may not initialed. – petertc Jan 05 '18 at 03:54
  • seeking a quick clarification for the above comment @DanielLangdon, is the lazy val initialized once per Spark app (on the driver) or once per executor? – Yash Feb 23 '18 at 20:02
  • It is not complicated. It is a standard scala lazy val. It is initialized only one per JVM. Spark uses one JVM per executor (if I remember, on a different job for 2 years now). – Daniel Langdon Feb 25 '18 at 04:31
  • This is why people should always post code here. The links now go to the default Spark documentation page, and there are no examples of closures provided. – supriyo_basak Feb 05 '20 at 13:52
11

The copy of your class in your driver process isn't the copy in your executors. They aren't in the same ClassLoader, or even the same JVM, or even on the same machine. Setting a static variable on the driver does nothing to the other copies, hence you find it null remotely.

Sean Owen
  • 66,182
  • 23
  • 141
  • 173
  • @SeanOwen, how do I handle this? any susgestion? The same code works on Hadoop MapReduce framework. – diplomaticguru Apr 16 '15 at 21:57
  • @SeanOwn and @DanielL, Okay so I'm able to pass and retain the value by calling the function with flatMap() method. e.g `JavaRDD wordsE = lines .flatMap(new FlatMapFunction() { @Override public Iterable call(String s) { Test.setName("TestName") Test.print();` – diplomaticguru Apr 17 '15 at 01:49
  • However, these functions will be called for every single line found in the RDD. How do I only call it just once? Basically, I'm looking for something equivalent to setup() that is found in MapReduce framework. – diplomaticguru Apr 17 '15 at 01:57
  • 1
    Ah, well, at least there is an RDD there now! I still think you should provide a better sample of the code if you want to see where the exception happens, but now I see where you are going with this, will post an answer... – Daniel Langdon Apr 17 '15 at 12:59
1

I would like to add one more approach this makes sense only when if you have a few variables which cab ne passed in runtime as arguments.

spark Configuration --> --conf "spark.executor.extraJavaOptions=-DcutomField=${value}" and when you need data in transformations you can call System.getProperty("cutomField");

you can find more details here

Note: above discussed does not make sense when we have a significant number of variables . in those cases, I would prefer @Daniel Langdon approaches.

kavetiraviteja
  • 2,058
  • 1
  • 15
  • 35
1

I would like to add one more point into DanielL's Answer

When declare a variable with static keyword the JVM loads it during the class loading so if you create a jar and set initial values of static fields in a Java /scala class are stored in the jar, workers can use it directly. However if you change the value of a static field in the driver program, workers can only see the initial value assigned into Jar and your changed value will not reflect , so you need to copy again new jar or need to copy class manually into all executors .

vaquar khan
  • 10,864
  • 5
  • 72
  • 96