1

Is there any idea on why the whole object B need to be serialized?

object A{

  def main(args:Array[String])= {
    val conf = new SparkConf().setAppName("test").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd=sc.makeRDD(Seq(1,2,3,4,5,6,7))
    val b=new B
    b.add(rdd)
  }
}
class B {

  val s="456"
  def add=(rdd:RDD[Int])=>{
    rdd.map(e=>e+" "+s).foreach(println)
  }
}

Exception about "object not serializable":

Serialization stack:
- object not serializable (class: B, value: B@1fde4f40)
- field (class: B$$anonfun$add$1, name: $outer, type: class B)
- object (class B$$anonfun$add$1, <function1>)
- field (class: B$$anonfun$add$1$$anonfun$apply$1, name: $outer, type: class B$$anonfun$add$1)
- object (class B$$anonfun$add$1$$anonfun$apply$1, <function1>)
lzh
  • 21
  • 1
  • 3

1 Answers1

2

See at the linked Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects. What your syntax

def add=(rdd:RDD[Int])=>{
  rdd.map(e=>e+" "+s).foreach(println)
}

really means is

def addReal(): RDD[Int] => Unit = (rdd: RDD[Int]) => {
  val rddFunc: Int => String = e => e + " " + s
  rdd.map(rddFunc).foreach(println)
}

So now rddFunc captures s from the context of its containing object B and this is done by actually capturing the whole object B which is not serializable.

So besides options listed in the linked answer, what should help you as well is changing your add to

def add(rdd:RDD[Int]) = {
  val localS = s
  rdd.map(e => e + " " + localS).foreach(println)
}

The main trick is introduction of the localS variable that forces rddFunc to capture just it rather than whole B.

Sidenote: In the code above I also changed signature of your add from

def add():Function1[RDD[Int], Unit]  

that you had to

def add(rdd:RDD[Int]):Unit

There are no benefits in your case of add returning a function that you call immediately.

SergGr
  • 23,570
  • 2
  • 30
  • 51
  • yes, but the problem is not due to differences between function and method. "localS" can work, while I'm still not clear why the whole object B will be serialized when "s" is captured. – lzh Jan 10 '18 at 10:57
  • @lzh, 1)Yes, that difference is not important to your question. It is just a little inefficiency. 2)I'm not sure what answer about `s` would satisfy you. This is just the way the Scala compiler works. The obvious benefit of this approach is simplicity: compiler doesn't have to analyze which fields and/or methods are used and which are not. If any is used - capture the whole `this` and use it to access everything. This is also more memory-efficient in most of the cases (you always capture one `this` instead of many fields). There is no hand-crafted optimization for your particular (rare) case. – SergGr Jan 10 '18 at 11:01
  • SergGr, I have understood it this time, thanks for your detailed answer. – lzh Jan 10 '18 at 11:29