12

I'm new to spark, and was trying to run the example JavaSparkPi.java, it runs well, but because i have to use this in another java s I copy all things from main to a method in the class and try to call the method in main, it saids

org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException

the code looks like this:

public class JavaSparkPi {

public void cal(){
    JavaSparkContext jsc = new JavaSparkContext("local", "JavaLogQuery");
    int slices = 2;
    int n = 100000 * slices;

    List<Integer> l = new ArrayList<Integer>(n);
    for (int i = 0; i < n; i++) {
        l.add(i);
    }

    JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);

    System.out.println("count is: "+ dataSet.count());
    dataSet.foreach(new VoidFunction<Integer>(){
        public void call(Integer i){
            System.out.println(i);
        }
    });

    int count = dataSet.map(new Function<Integer, Integer>() {
        @Override
        public Integer call(Integer integer) throws Exception {
            double x = Math.random() * 2 - 1;
            double y = Math.random() * 2 - 1;
            return (x * x + y * y < 1) ? 1 : 0;
        }
    }).reduce(new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer integer, Integer integer2) throws Exception {
            return integer + integer2;
        }
    });

    System.out.println("Pi is roughly " + 4.0 * count / n);
}

public static void main(String[] args) throws Exception {

    JavaSparkPi myClass = new JavaSparkPi();
    myClass.cal();
}
}

anyone have idea on this? thanks!

user2810081
  • 589
  • 1
  • 8
  • 27
  • 1
    You omitted the most important line in the error. What class is not Serializable? – Sean Owen Jun 04 '14 at 20:27
  • @SeanOwen Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: com.mycompany.sparkproject2.JavaSparkPi is this the one? – user2810081 Jun 04 '14 at 20:32
  • 1
    I think your question has been answered here: http://stackoverflow.com/questions/22592811/scala-spark-task-not-serializable-java-io-notserializableexceptionon-when/22594142#22594142 – samthebest Jun 05 '14 at 09:46
  • Possible duplicate of [Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects](https://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou) – jopasserat Sep 01 '17 at 17:32

3 Answers3

16

The nested functions hold a reference to the containing object (JavaSparkPi). So this object will get serialized. For this to work, it needs to be serializable. Simple to do:

public class JavaSparkPi implements Serializable {
  ...
Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
  • 2
    This is right, or, make the anonymous classes inner static classes, if you can't or don't want to make the outer class static. This is more hassle code-wise though – Sean Owen Jun 05 '14 at 06:24
  • http://stackoverflow.com/questions/40818001/understanding-spark-serialization/40818002?sfb=2#40818002 an overview for understanding spark serialization – KrazyGautam Nov 26 '16 at 14:37
2

The main problem is that when you create an Anonymous Class in java it is passed a reference of the enclosing class. This can be fixed in many ways

Declare the enclosing class Serializable

This works in your case but will fall flat in case your enclosing class has some field that is not serializable. I would also say that serializing the parent class is a total waste.

Create the Closure in a static function

Creating the closure by invoking some static function doesn't pass the reference to the closure and hence no need to make serializable this way.

Community
  • 1
  • 1
Anuj J
  • 123
  • 2
  • 7
0

This error comes because you have multiple physical CPUs in your local or cluster and spark engine try to send this function to multiple CPUs over network. Your function

 dataSet.foreach(new VoidFunction<Integer>(){
        public void call(Integer i){
            ***System.out.println(i);***
        }
    });

uses println() which is not serialize. So the exception thrown by Spark Engine. The solution is you can use below:

dataSet.collect().forEach(new VoidFunction<Integer>(){
       public void call(Integer i){
         System.out.println(i);
    }
});
Jeetu
  • 1