2

I want to know how transient variables are available on the workers. For example:- A map task command is sent from the driver to an executor by serializing the MapFunction object. The executor deserializes the command, and executes it on a partition. Now if in that mapFunction i use a transient variable, how is it available on the workers, as it is not serialized and sent to the workers.

Also in the example of following link https://www.mapr.com/blog/how-log-apache-spark

Example:

Class Test{

transient static SparkSession sparkSession;

public static void main(String[] args){


    sparkSession = //Initialize SparkSession

    Dataset<Row> dataset = sparkSession.read().csv("A.csv");

    dataset.createOrReplaceTempView("TEMP_TABLE");

    Dataset<Row> dataset2 = sparkSession.sql("SELECT * FROM TEMP_TABLE");

    Dataset<String> stringDataset = dataset2.map((MapFuction<Row,String>) (row)->{

                        Dataset<Row> tempDataset = sparkSession.sql("SELECT NAME FROM TEMP_TABLE WHERE ID='" + row.getString(0) + "'");

                        String temp = tempDataset.first().getString(0);

                        return temp;
                    },Encoders.STRING());

    stringDataset.show();       
}
}

In above example how was sparkSession resolved on workers, as it was created on driver and while sending the closure to workers sparkSession was not sent as it was not serialized so shouldn't it be null on workers but it was not. Why?

As sparkSession is a static variable so it is stored in the class definition, so when that closure is sent to the workers, Is the Test class definition also sent to the workers with the serialized closure ?

Kiba
  • 399
  • 1
  • 4
  • 16

1 Answers1

1

I am not sure how lambdas are serialized, but the lambda you created certainly has the reference to the value of sparkSession. Anything used inside a lambda becomes part of it.

  • thanks for the answer, yes the lambda has the reference to the sparkSession but how does the workers have that same reference. As sparkSession is a static variable so it is stored in the class definition, so when that closure is sent to the workers, Is the Test class definition also sent to the workers with the serialized closure ? – Kiba Aug 31 '16 at 18:12
  • It's not "variable" that's sent to workers, it's the object that variable refers to. So the `Test` class is not relevant, and the variable `sparkSession` is not relevant. What is relevant is the instance of `SparkSession` class that is serialized with the lambda. Internally, that instance is held by a hidden member variable of the class implementing the lambda. –  Aug 31 '16 at 18:55
  • does control comes back to the driver to execute the transient variable line part and again goes back to the worker to execute the rest. Someone told me this today but i didn't believe it, so i wanted to make sure if it really does work like that. – Kiba Nov 21 '16 at 19:07
  • As far as I know, the entire execution of the Lambda is happening on the executor. The system relies on the same code (same jars) being available to the driver JVM and executor JVM (or threads, but in that case it's obvious). (Edit: "execute the variable"? that's not right) –  Nov 21 '16 at 20:04
  • Thanks for the reply. i read the following in (http://stackoverflow.com/questions/29685330/how-to-set-and-get-static-variables-from-spark) --- The copy of a class in the driver process isn't the copy in the executors. They aren't in the same ClassLoader, or even the same JVM, or even on the same machine. So, Setting a static variable on the driver does nothing to the other copies, hence it should be null remotely. So how is sparkSession available on executors? – Kiba Nov 22 '16 at 03:35
  • 1
    It is true. The **changing** of static variable **after** you created the lambda does nothing. But the serialized lambda itself contains a copy of the STATE of the session object (NOT the variable, the OBJECT it was referencing) as it was at the moment of lambda's serialization. When the lambda is deserialized on another JVM, a NEW session object is created, and the state of that object is set to be exactly as the state of the session object in the original JVM. The **variable**, of course, does not exist in the new JVM. The only thing that refers to the new object is the hdden field in lmbd –  Nov 22 '16 at 13:29