0

I am using Spark to run an existing Java package which uses java.util.logging.Logger, and I am getting an error:

org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:911)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
    at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:332)
    at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:46)
    at edu.uth.clamp.nlp.main.RunPipelineWithSpark.processFolder(RunPipelineWithSpark.java:271)
    at edu.uth.clamp.nlp.main.RunPipelineWithSpark.process(RunPipelineWithSpark.java:179)
    at edu.uth.clamp.nlp.main.RunPipelineWithSpark.main(RunPipelineWithSpark.java:136)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: java.util.logging.Logger
Serialization stack:
    - object not serializable (class: java.util.logging.Logger, value: java.util.logging.Logger@a23dc07)
    - field (class: edu.uth.clamp.nlp.ner.CRFNameEntityRecognizer, name: logger, type: class java.util.logging.Logger)
    - object (class edu.uth.clamp.nlp.ner.CRFNameEntityRecognizer, edu.uth.clamp.nlp.ner.CRFNameEntityRecognizer@5199fdf9)
    - field (class: edu.uth.clamp.nlp.uima.NameEntityUIMA, name: recognizer, type: class edu.uth.clamp.nlp.ner.CRFNameEntityRecognizer)
    - object (class edu.uth.clamp.nlp.uima.NameEntityUIMA, edu.uth.clamp.nlp.uima.NameEntityUIMA@23a84ec4)
    - writeObject data (class: java.util.ArrayList)
Laurel
  • 5,965
  • 14
  • 31
  • 57
MIn
  • 3
  • 3

5 Answers5

2

Please check you are trying to serialize logger instance, make the logger field as static or transient.

Manas D
  • 19
  • 2
1

Spark expects the function that is passed inside the transformation of rdd/dstream should be serializable. Since the java.util.logging.Logger is not serializable , you should not have logging related code inside the function. You could replace the log with simple println . Or you could try the options suggested here.

Apache Spark logging within Scala

Note that log can be there in driver code. And make sure that it is not referencing any variable outside your function which is not serializable. To have a better understanding on serialization due to closure, learn the concept of closure doc doc2.

Community
  • 1
  • 1
Knight71
  • 2,927
  • 5
  • 37
  • 63
1

Try using @transient lazy val while creating the log object. Also it is better to use it inside closure so that spark itself will take care of this.

DILIP KUMAR
  • 153
  • 1
  • 8
0

Your code probably looks something like

NameEntityUIMA nameEntity = ...;
JavaRDD<SomeType> rdd = ...;
rdd.foreach(x -> /* code using nameEntity */);

foreach has to serialize its argument to send it to each node; because the argument uses nameEntity, it needs to be serialized too, but it can't be (and due to the design of Java serialization, this is only detected at runtime instead of giving a compilation error). Instead, you want to create the nameEntity on each partition. You could do

JavaRDD<SomeType> rdd = ...;
rdd.foreach(x -> {
    NameEntityUIMA nameEntity = ...;
    /* code using nameEntity */
});

but this creates a new nameEntity for each element of the RDD, which would perform horribly. Instead, use foreachPartition.

Alexey Romanov
  • 167,066
  • 35
  • 309
  • 487
0

Logger is not serializable and most likely you are trying to access it from executors. I would suggest to have it defined as lazy

lazy val logger = ....

However the drawback is that you shouldn't be using the logger within driver. The other option that is not very sexy is that have another logger for the executors..