1

I recently turned on write ahead logs for our Spark Streaming application and I am getting serialization exceptions for log4j (shown below). I honestly have no idea what is causing this, however I expect there is some configuration setting that I need to change (I have already set the checkpoint directory). The streaming instance is just reading JSON from the kafka topic.

java.io.NotSerializableException: org.apache.log4j.Logger
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:177)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1144)
    at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:172)
    at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:190)
    at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:289)
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:180)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:86)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1.aroundReceive(JobGenerator.scala:84)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
f_puras
  • 2,521
  • 4
  • 33
  • 38
masmithd
  • 101
  • 1
  • 4
  • Maybe there is a `Logger` instance in some class, defined as an object member. Try finding it and make it `static`. – f_puras Jul 31 '15 at 16:59
  • @f_puras could you please change your comment to an answer? this is in 99,(9)% OP's problem and it will make life easier for people looking at unanswered questions. – Mateusz Dymczyk Jul 31 '15 at 17:51
  • @Mateusz You're right. I thought someone else would take the cake. Thanks for noticing. – f_puras Jul 31 '15 at 19:49
  • @f_puras it was an issue related to Apache Spark rather than the logger, see answer below for more information. – masmithd Aug 10 '15 at 19:08
  • Ok. Good you found out and documented it here (+1). – f_puras Aug 10 '15 at 19:11

2 Answers2

1

Your error message indicates that an instance of log4j's Logger class is used as part of your classes. The standard way to using Loggers is creating a static class member as:

private static final Logger LOG = Logger.getLogger(MyClass.class);

According to the error message quoted, a Logger instance appears to be used as an object member instead. As such, it is subject to serialization, which fails because Loggers cannot be serialized (and there is no point in doing so). You should check your application's classes for non-static usages of Logger members, and turn them into statics to resolve the issue.

See Why do we declare Loggers static final?.

f_puras
  • 2,521
  • 4
  • 33
  • 38
0

I'm doing this for a Spark Streaming application. It turned out to be issues with enabling write ahead logs ... When you enable write ahead logs, everything within the forEachRDD method needs to be serializable, which wasn't well documented. This resulted in classes getting serialized that I didn't assume would be.

masmithd
  • 101
  • 1
  • 4