6

This is a working code example:

JavaPairDStream<String, String> messages = KafkaUtils.createStream(javaStreamingContext, zkQuorum, group, topicMap);
messages.print();
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
    @Override
    public String call(Tuple2<String, String> tuple2) {
        return tuple2._2();
    }
});

I get the below error:

ERROR:
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
    at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438)
    at org.apache.spark.streaming.api.java.JavaDStreamLike$class.map(JavaDStreamLike.scala:140)
    at org.apache.spark.streaming.api.java.JavaPairDStream.map(JavaPairDStream.scala:46)
Belphegor
  • 4,456
  • 11
  • 34
  • 59
xiaolong li
  • 63
  • 1
  • 1
  • 4
  • 1
    Well, if it's working it's great :). If it's not, you can turn on Java serialization debugging with `-Dsun.io.serialization.extendedDebugInfo=true`. – Daniel Darabos Mar 27 '15 at 12:18
  • 1
    Thank you,it's not ok,I've tried. JavaDStream lines = messages.map(new Function, String>() { @Override public String call(Tuple2 tuple2) { return tuple2._2(); } }); This line of code problems. – xiaolong li Mar 28 '15 at 02:28
  • Pretty sure this code is Java and not Scala (i.e. tag) – SparkleGoat May 25 '17 at 17:28

2 Answers2

15

Since you're defining your map function using an anonymous inner class, the containing class must also be Serializable. Define your map function as a separate class or make it a static inner class. From the Java documentation (http://docs.oracle.com/javase/8/docs/platform/serialization/spec/serial-arch.html):

Note - Serialization of inner classes (i.e., nested classes that are not static member classes), including local and anonymous classes, is strongly discouraged for several reasons. Because inner classes declared in non-static contexts contain implicit non-transient references to enclosing class instances, serializing such an inner class instance will result in serialization of its associated outer class instance as well.

InPursuit
  • 3,245
  • 24
  • 20
  • Glad it helped! Please accept the answer if it did – InPursuit Apr 01 '15 at 17:52
  • Spark is trying to serialize the object passed to map but can't serialize it because doesn't implement Serializable? Why Spark is doing serialization? And if we define the map function as a separate class, do we need to make it Serializable as well? – Johan Aug 02 '16 at 21:56
4

just providing the code sample :

JavaDStream<String> lines = messages.map(mapFunc);

declare the inner class as a static variable :

static Function<Tuple2<String, String>, String> mapFunc=new Function<Tuple2<String, String>, String>() {
    @Override
    public String call(Tuple2<String, String> tuple2) {
        return tuple2._2();
    }
}
udyan
  • 141
  • 5