0

I want to add user defined variables that can be used inside filter and map functions in spark, currently I am getting an error while trying to do that.

EDIT : I am running this code on zeppelin notebook so I dont have to create a seperate class.

My code is somewhat like :

val some_value = "qwerty"
val some_other_value = "x.y.z"

data.filter(r => r.getString("a.b.c").equals(some_value))
.map(r => (r.getString(some_other_value)))

Please note that, here "data" is an RDD containing JSONs

I am getting the following error:

org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:341)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:340)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.filter(RDD.scala:340)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$fa17825793f04f8d2edd8765c45e2a6c$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:202)......
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.zeppelin.spark.SparkInterpreter.interpretInput(SparkInterpreter.java:747)
at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:711)
at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:704)
at org.apache.zeppelin.interpreter.ClassloaderInterpreter.interpret(ClassloaderInterpreter.java:57)
at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:312)
at org.apache.zeppelin.scheduler.Job.run(Job.java:171)
at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:139)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@7f166a30)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: c, type: class org.apache.spark.SparkContext)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@65b97194)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@5e9afdf2)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@1d72f175)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@7b44290b)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC@2fd0142d)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC@1812605b)
    - field (class: $iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC@40020ce9)
    - field (class: $iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC, $iwC$$iwC$$iwC@3db9c1c)
    - field (class: $iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC)
    - object (class $iwC$$iwC, $iwC$$iwC@6968a929)
    - field (class: $iwC, name: $iw, type: class $iwC$$iwC)
    - object (class $iwC, $iwC@1e8d42e6)
    - field (class: $line25.$read, name: $iw, type: class $iwC)
    - object (class $line25.$read, $line25.$read@6b4256a)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $VAL5449, type: class $line25.$read)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@7d6cf781)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@f4d2716)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@d8c3a34)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC@7bc0d5a5)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC@74ef7040)
    - field (class: $iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC@7e55be5d)
    - field (class: $iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC, $iwC$$iwC$$iwC@2f59915e)
    - field (class: $iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC)
    - object (class $iwC$$iwC, $iwC$$iwC@36faa9d0)
    - field (class: $iwC, name: $iw, type: class $iwC$$iwC)
    - object (class $iwC, $iwC@54f2aef7)
    - field (class: $line385.$read, name: $iw, type: class $iwC)
    - object (class $line385.$read, $line385.$read@3d8590f3)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$fa17825793f04f8d2edd8765c45e2a6c$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $VAL5516, type: class $line385.$read)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$fa17825793f04f8d2edd8765c45e2a6c$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$fa17825793f04f8d2edd8765c45e2a6c$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@419dfc19)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$fa17825793f04f8d2edd8765c45e2a6c$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$fa17825793f04f8d2edd8765c45e2a6c$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$fa17825793f04f8d2edd8765c45e2a6c$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$fa17825793f04f8d2edd8765c45e2a6c$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@6e994c6d)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$2e9cf4ebd66898e1aa2d2fdd9497ea7$$$$C$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$fa17825793f04f8d2edd8765c45e2a6c$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$2e9cf4ebd66898e1aa2d2fdd9497ea7$$$$C$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, <function1>)
    at 

    org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
        ... 123 more

EDIT 2 : Finally I got the solution, just wrap the whole code as a scala function and pass the parameters.

val some_value = "qwerty"
val some_other_value = "x.y.z"

def func (value_1 : String, value_2 : String) : RDD[String] = {
val d = data.filter(r => r.getString("a.b.c").equals(value_2))
.map(r => (r.getString(value_2)))
return d
}

val new_data = func(some_value, some_other_value)

Another method is to use functions for complete operations inside the closure.

def myFilter(e: String) = (r: org.apache.avro.generic.GenericRecord) => r.getString("a.b.c").equals(e)
def myGenerator(f: String) = (r: org.apache.avro.generic.GenericRecord) => r.getString(f)

val p = data.filter (myFilter(x)).map(myGenerator(y))
Nitish
  • 21
  • 6
  • 5
    Please [edit] to show the full class and stacktrace – OneCricketeer Aug 25 '16 at 16:10
  • The trouble usually with Spark is that it tries to serialize everything in the function graph. Is `some_value` or `some_other_value` defined in a class which is not serializable? Or extends/implements something which is not serializable? – Saket Aug 25 '16 at 16:18
  • @Saket - I am defining them as standalone variables and not inside any class, so I am not sure if thats the problem, but I am new to spark and still learning the tricks. I ran the above code as is. Is there any way to serialize a variable? – Nitish Aug 26 '16 at 05:43
  • @cricket_007 : edited with full error message containing the stack trace – Nitish Aug 26 '16 at 05:55
  • Are you running this from Spark shell? You should be able to run the query if you inline the vars into the lambda itself. – Saket Aug 26 '16 at 09:07
  • Your stacktrace points to SparkContext, it is being referred to by one of your blocks in the notebook. Is all the code from your notebook pasted here? – Saket Aug 26 '16 at 22:39
  • Hi I updated my answer with edit which uses case class. case class are by default serializable – Ram Ghadiyaram Aug 27 '16 at 16:46

2 Answers2

2

org.apache.spark.SparkException: Task not serializable exception, it means that you use a reference to an instance of a non-serializable class inside a transformation.

Beware of closures using fields/methods of outer object (these will reference the whole object)

For ex :

NotSerializable notSerializable = new NotSerializable();
JavaRDD<String> rdd = sc.textFile("/tmp/myfile");
rdd.map(s -> notSerializable.doSomething(s)).collect();

Here are some ideas to fix this error:

  1. Serializable the class
  2. Declare the instance only within the lambda function passed in map.
  3. Make the NotSerializable object as a static and create it once per machine.
  4. Call rdd.forEachPartition and create the NotSerializable object in there like this:

rdd.forEachPartition(iter -> { NotSerializable notSerializable = new NotSerializable();

// ...Now process iterator

});

Also, look at Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects

Tip : Can use jvm parameter to see detailed info about serialization

-Dsun.io.serialization.extendedDebugInfo=true in SPARK_JAVA_OPTS

  • SerializationDebugger :

    SPARK-5307 introduced SerializationDebugger and Spark 1.3.0 is the first version to use it. It adds serialization path to a NotSerializableException. When a NotSerializableException is encountered, the debugger visits the object graph to find the path towards the object that cannot be serialized, and constructs information to help user to find the object.For ex :

    Serialization stack: - object not serializable (class: testing, value: testing@2dfe2f00) - field (class: testing$$anonfun$1, name: $outer, type: class testing) - object (class testing$$anonfun$1, <function1>)

EDIT

I'm directly trying to address problem here...

Since we are going to run our Spark application on JVM, we have a requirement that in order to serialize an object your class should explicitly extend special Serializable interface. In Scala when you are declaring a case class it automatically extends the Serializable interface.

1.case class approach :

object Test extends App { case class MyInputForMapAndFilter(somevalue: String, someothervalue: String) val some_value = "qwerty" val some_other_value = "x.y.z" val myInputForMapAndFilter = MyInputForMapAndFilter(some_value,some_other_value) data.filter(r => r.getString("a.b.c").equals(myInputForMapAndFilter.somevalue)) .map(r => (r.getString(myInputForMapAndFilter.someothervalue))) }

Case class in scala is by default serializable

2. Another approach you can try : you can declare val as transient

@transient val some_value = "qwerty"
@transient val some_other_value = "x.y.z"
data.filter(r => r.getString("a.b.c").equals(some_value))
  .map(r => (r.getString(some_other_value)))

please try this ans let me know output... pls be proactive to ask more questions/ problems

Community
  • 1
  • 1
Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
  • Thanks a lot for the detailed post and suggestions. I tried your recommendation, but I am still getting a serialization error. I implemented exactly the way you described! – Nitish Aug 28 '16 at 13:37
  • you implemented in the same way as above code snippet? – Ram Ghadiyaram Aug 28 '16 at 15:28
  • are you using java serialization of kyro ? – Ram Ghadiyaram Aug 28 '16 at 15:31
  • Yes, I ran the code in the same way as above, I am using Zeppelin notebook to run these commands, so will I have to use java serialization of kyro seperately? – Nitish Aug 29 '16 at 08:02
  • Ok can you wrap all the variables in to one object and pass as filter ? – Ram Ghadiyaram Aug 29 '16 at 08:14
  • Hi Please see my updated answer and try. change : enclosed in an object – Ram Ghadiyaram Aug 29 '16 at 11:24
  • updated answer : change is - declared variables as `transient` – Ram Ghadiyaram Aug 29 '16 at 16:14
  • transient solution didn't work either, but I have found the solution finally, have updated it above. But thanks a lot for the tips :) – Nitish Aug 30 '16 at 05:00
  • initially I suggested your updated method 2 i.e like this `rdd.forEachPartition(iter -> { NotSerializable notSerializable = new NotSerializable(); // ...Now process iterator ` – Ram Ghadiyaram Aug 30 '16 at 09:17
  • calling within closure I thought it was not working either thats where I suggested `case` , `transient` solutions – Ram Ghadiyaram Aug 30 '16 at 09:18
  • For your information [case solution](http://www.cakesolutions.net/teamblogs/demystifying-spark-serialisation-error) and [Transient](http://stackoverflow.com/questions/37206108/spark-1-6-1-task-not-serializable-when-evaluating-a-classifier-on-a-dataframe) are inspired from the links – Ram Ghadiyaram Aug 30 '16 at 09:34
0

What you need to do is use a broadcast variable which you can then fetch in map and reduce functions.

For example to create the broadcast variable do:

val broadcastString = sc.broadcast("my string")

And in your map and reduce functions you can get the broadcasted string using:

val myStr = broadcastString.value

See here for more: http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables

Saif Charaniya
  • 131
  • 1
  • 8
  • I am still getting the same error : `bString_1: org.apache.spark.broadcast.Broadcast[String] = Broadcast(55) bString_2: org.apache.spark.broadcast.Broadcast[String] = Broadcast(56) bString_a: String = WebSessionStartEvent bString_b: String = body.customProperties.Queue.value org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean....` – Nitish Aug 26 '16 at 05:44
  • sorry for the jumbled error message, im not able to format it. – Nitish Aug 26 '16 at 05:50