1

I've came accross a problem with Spark, using the JavaPairRdd.repartitionAndrepartitionAndSortWithinPartitions method. I've tried everything any reasonable person would have thought of. I finally written a small snippet simple enough to visualize the problem :

public class Main {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("test").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        final List<String> list = Arrays.asList("I", "am", "totally", "baffled");
        final HashPartitioner partitioner = new HashPartitioner(2);

        doSomething(sc, list, partitioner, String.CASE_INSENSITIVE_ORDER);
        doSomething(sc, list, partitioner, Main::compareString);
        doSomething(sc, list, partitioner, new StringComparator());
        doSomething(sc, list, partitioner, new SerializableStringComparator());
        doSomething(sc, list, partitioner, (s1,s2) -> Integer.compare(s1.charAt(0),s2.charAt(0)));
    }

    public static <T> void doSomething(JavaSparkContext sc, List<T> list, Partitioner partitioner, Comparator<T> comparator) {
        try {
            sc.parallelize(list)
                .mapToPair(elt -> new Tuple2<>(elt,elt))
                .repartitionAndSortWithinPartitions(partitioner,comparator)
                .count();
            System.out.println("success");
        } catch (Exception e) {
            System.out.println("failure");
        }
    }

    public static int compareString(String s1, String s2) {
        return Integer.compare(s1.charAt(0),s2.charAt(0));
    }

    public static class StringComparator implements Comparator<String> {
        @Override
        public int compare(String s1, String s2) {
            return Integer.compare(s1.charAt(0),s2.charAt(0));
        }
    }

    public static class SerializableStringComparator implements Comparator<String>, Serializable {
        @Override
        public int compare(String s1, String s2) {
            return Integer.compare(s1.charAt(0),s2.charAt(0));
        }
    }
}

Apart from the Spark logging, it outputs :

success
failure
failure 
success
failure

The exception thrown in case of failure is always the same :

org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.reflect.InvocationTargetException
sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:483)
org.apache.spark.serializer.SerializationDebugger$ObjectStreamClassMethods$.getObjFieldValues$extension(SerializationDebugger.scala:240)
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:150)
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99)
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158)
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99)
org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:58)
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:39)
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:835)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:781)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:780)
scala.collection.immutable.List.foreach(List.scala:318)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:780)
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:847)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:781)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:780)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:780)
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Now I've got my fix : declaring my custom comparator as Serializable (I checked in the standard library code, the string case insensitive comparator is declared as serializable so that makes sense).

But why ? Why should I not use lambdas here ? I would have expected the second and the last one to work properly since I only used static methods and classes.

What I find especially weird is that I have registered the classes I am trying to serialize to Kryo, and the class I did not register can be trivially serialized with their default associated serializer (Kryo associates FieldSerializer as the default one for most of the classes). However, Kryo registrator is never executed before the task fails being serialized.

Dici
  • 25,226
  • 7
  • 41
  • 82
  • 2
    The deal is either you write the serialization or let Spark do it for you. If you do it, then you need to implement Serializable. – ayan guha May 27 '15 at 01:12
  • 1
    Lambdas are not Serializable by default. Check this post: http://stackoverflow.com/questions/22807912/how-to-serialize-a-lambda – kostya May 27 '15 at 03:32
  • @ayanguha in my real world example, I'm using Kryo serialization and as I did not specify a serializer for `Comparator` I expected it to default on a `FieldSerializer`, which is fine since I have no fields ! – Dici May 27 '15 at 08:58
  • @kostya I'm gonna test it shortly – Dici May 27 '15 at 08:59
  • The exception is thrown even before my Kryo registrator is executed... This makes no sense. – Dici May 27 '15 at 09:22

1 Answers1

2

My question was not clearly expliciting why I was so baffled (about Kryo registering code not being executed), so I edited it to reflect it.

I have figured out that Spark uses two different serializers :

  • one for serializing the tasks from the master to the slaves, called closureSerializer in the code (see SparkEnv.scala). It can only be set to JavaSerializer at the date of my post.

  • one for serializing the actual data that is processed, called serializer in the SparkEnv. This one can be set either to JavaSerializer or `KryoSerializer.

Registering a class to Kryo does not ensure you that it will always be serialized with Kryo, it depends on how you use it. As an example, the DAGScheduler only uses closureSerializer so no matter how you configure serialization, you will always need to make your objects Java-serializables if they are manipulated by DAGScheduler at some point (unless Spark enables Kryo serialization for closures in later releases).

Dici
  • 25,226
  • 7
  • 41
  • 82