1

I'm looking for some help with how object/singleton vals are initialized when using the spark-shell.

Test code:

import org.apache.spark.sql.SparkSession

object Consts {
    val f2 = 2
}
object Test extends Serializable {
    val f1 = 1
    println(s"-- init Test singleton f1=${f1} f2=${Consts.f2}")
    def doWorkWithF1(x: Int)  = {
        f1
    }
    def doPartitionWorkWithF1(partitionId: Int, iter: Iterator[Int])  = {
        iter.map(x => f1)
    }
    def doPartitionWorkWithF2(partitionId: Int, iter: Iterator[Int])  = {
        iter.map(x => Consts.f2)
    }
    def main(args: Array[String]) {
        println(s"-- main starting f1=${f1} f2=${Consts.f2}")
        val spark = SparkSession.builder().getOrCreate()
        val rdd = spark.sparkContext.parallelize(List(1,2,3,4))
        rdd.map(doWorkWithF1).foreach(print)
        rdd.mapPartitionsWithIndex(doPartitionWorkWithF1).foreach(print)
        rdd.mapPartitionsWithIndex(doPartitionWorkWithF2).foreach(print)
    }
}

Running:

$ spark-shell --master local[4]
scala> :paste "test.scala"
...
defined object Consts
defined object Test

scala> Test.main(Array())
-- init Test singleton f1=1 f2=2
-- main starting f1=1 f2=2
11110000
23/02/22 21:03:31 ERROR executor.Executor: Exception in task 1.0 in stage 2.0 (TID 9)
java.lang.NullPointerException
        at $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Test$$anonfun$doPartitionWorkWithF2$1.apply$mcII$sp(test.scala:37)
        at $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Test$$anonfun$doPartitionWorkWithF2$1.apply(test.scala:37)
        at $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Test$$anonfun$doPartitionWorkWithF2$1.apply(test.scala:37)
...
  • doWorkWithF1 (using map) works as I expect. The output is 1111.
  • In doPartitionWorkWithF1, the output is not what I expect. It is 0000. Why is val f1 set to 0 and not 1?
    Asked another way: when are integer vals in object singletons only initialized to 0?
  • In doPartitionWorkWithF2, I assume the null pointer exception is because f2 is null. Why is that?
    Asked another way: when are vals in object singletons only initialized to null?

Changing line 6 to add lazy

    lazy val f1 = 1

makes doPartitionWorkWithF1 work as I desire (expected)--i.e., 1111 is the result in the spark-shell.

And this is where spark gets frustrating to work with: if the original version (without lazy) is compiled and run using spark-submit I get the desired/expected result:

$ /usr/bin/spark-submit --master local[4] --driver-memory 1024m --name "TEST" --class Test test.jar 2> err
-- init Test singleton f1=1 f2=2
-- main starting f1=1 f2=2
111111112222

I really don't like it when I have to write code differently to work in the spark-shell. But since the shell is so convenient, I do it. These kinds of nuances cost me a lot of time and effort though. The above is the salient parts of a 2000-line program that took me hours to figure out where in the code the shell was doing something different than the compiled version.

Mark Rajcok
  • 362,217
  • 114
  • 495
  • 492

2 Answers2

1

In spark-shell somehow it tries to serialize objects instead of sending their class definitions into executors.

In your case your Consts object is not serializable, therefore it sends an empty object into executors then values becomes null.

I couldn't find a proper workaround for Spark 2.x but below fix works fine with Spark 3:

test.scala

import org.apache.spark.sql.SparkSession

@SerialVersionUID(123L)
object Consts extends Serializable {
    val f2 = 2
}
object Test extends Serializable {
    val f1 = 1
    println(s"-- init Test singleton f1=${f1} f2=${Consts.f2}")
    def doWorkWithF1(x: Int)  = {
        f1
    }
    def doPartitionWorkWithF1(partitionId: Int, iter: Iterator[Int])  = {
        iter.map(x => f1)
    }
    def doPartitionWorkWithF2(partitionId: Int, iter: Iterator[Int])  = {
        iter.map(x => Consts.f2)
    }
    def main(args: Array[String]) {
        println(s"-- main starting f1=${f1} f2=${Consts.f2}")
        val spark = SparkSession.builder().getOrCreate()
        val rdd = spark.sparkContext.parallelize(List(1,2,3,4))
        rdd.map(doWorkWithF1).foreach(print)
        rdd.mapPartitionsWithIndex(doPartitionWorkWithF1).foreach(print)
        rdd.mapPartitionsWithIndex(doPartitionWorkWithF2).foreach(print)
    }
}

Terminal output:

➜  ~ ~/Downloads/spark-3.3.2-bin-hadoop3/bin/spark-shell --master "local[4]"
23/03/02 03:16:24 WARN Utils: Your hostname, my.local resolves to a loopback address: 127.0.0.1; using 172.17.4.232 instead (on interface en0)
23/03/02 03:16:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/03/02 03:16:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://172.17.4.232:4040
Spark context available as 'sc' (master = local[*], app id = local-1677726988083).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.2
      /_/

Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 1.8.0_362)
Type in expressions to have them evaluated.
Type :help for more information.

scala> :paste "test.scala"
Pasting file test.scala...
import org.apache.spark.sql.SparkSession
defined object Consts
defined object Test

scala> Test.main(Array())
-- init Test singleton f1=1 f2=2
-- main starting f1=1 f2=2
11112222
veysiertekin
  • 1,731
  • 2
  • 15
  • 40
  • Thank you. I should have posted the code with `object Consts extends Serializable`, but as we both know, that doesn't help Spark2. Is `@SerialVersionUID(123L)` required to get this to work with Spark3? I'm not familiar with that annotation, or when it is required. Can you comment on that? I'm glad to learn that this will work with Spark3. – Mark Rajcok Mar 02 '23 at 17:05
  • Actually that annotation isn't make much difference for your case but you need extended `Serializable`. Annotation is for deserialization purposes; JVM matches them and knows they are compatible when it maps serialized data into objects when it uses classes on target. More details: https://stackoverflow.com/a/285809/1888799 – veysiertekin Mar 02 '23 at 17:59
0

I'm trying to follow your post so forgive me if this misses the mark.

Maybe try and initialize f2 in Consts with a lazy val

object Consts {
  lazy val f2 = 2
}

That way f2 is initialized when it is first called, regardless of the node. Like I said if I'm misunderstanding your goal I'm sorry.

linuxgx
  • 401
  • 2
  • 9
  • Thank you for your answer. I'm looking for explanations about how object/singleton vals are initialized in the shell. I'm not looking for answers about how to get the code to work in the shell. Also, adding `lazy` to f2 doesn't work in the shell. A NullPointerException still occurs when doPartitionWorkWithF2 is called. – Mark Rajcok Feb 26 '23 at 17:59