30

I'm trying to build a web api for my apache spark jobs using sparkjava.com framework. My code is:

@Override
public void init() {
    get("/hello",
            (req, res) -> {
                String sourcePath = "hdfs://spark:54310/input/*";

                SparkConf conf = new SparkConf().setAppName("LineCount");
                conf.setJars(new String[] { "/home/sam/resin-4.0.42/webapps/test.war" });
                File configFile = new File("config.properties");

                String sparkURI = "spark://hamrah:7077";

                conf.setMaster(sparkURI);
                conf.set("spark.driver.allowMultipleContexts", "true");
                JavaSparkContext sc = new JavaSparkContext(conf);

                @SuppressWarnings("resource")
                JavaRDD<String> log = sc.textFile(sourcePath);

                JavaRDD<String> lines = log.filter(x -> {
                    return true;
                });

                return lines.count();
            });
}

If I remove the lambda expression or put it inside a simple jar rather than a web service (somehow a servlet) it will run without any error. But using a lambda expression inside a servlet will result this exception:

15/01/28 10:36:33 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, hamrah): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.f$1 of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaRDD$$anonfun$filter$1
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1999)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

P.S: I tried combination of jersey and javaspark with jetty, tomcat and resin and all of them led me to the same result.

Arpit
  • 12,767
  • 3
  • 27
  • 40
Mehraban
  • 3,164
  • 4
  • 37
  • 60
  • 1
    @pzecevic I'm trying to use a combination of apache spark and spark java and the error above occurs only in combination of them. Without this combination everything works fine. – Mehraban Jan 28 '15 at 08:31
  • I see. Maybe you could file a JIRA or ask the question at Apache Spark user list? – pzecevic Jan 28 '15 at 11:11
  • Strange. Looks like it’s not respecting the `readResolve` method of the `SerializedLambda`. Would be interesting if it fails for ordinary objects with a `readResolve` method too. – Holger Jan 30 '15 at 17:50
  • @Holger I've got the same problem, cloud you please explain more? – Milad Khajavi Jan 31 '15 at 06:33
  • 2
    @Khajavi: Serializable lambda instances use `witeReplace` to substitute themselves with a [`SerializedLambda`](http://docs.oracle.com/javase/8/docs/api/java/lang/invoke/SerializedLambda.html) instance in the stream. `SerializedLambda` has a `readResolve` method to do the opposite when restoring. `witeReplace` and `readResolve` are [built-in features of Serialization](http://docs.oracle.com/javase/8/docs/api/java/io/Serializable.html). – Holger Feb 01 '15 at 10:39
  • Are you per chance using a different compilers dependent upon what you're building? I know there are some outstanding issues with the Eclipse JDT compiler surrounding lamda deserialisation. I created this post based on the problems I was having: http://stackoverflow.com/questions/27488567/invalid-lambda-deserialization/27502424#27502424 – Robert Bain Feb 05 '15 at 20:50
  • @RobertBain The very same code works outside servlet context. If it was a problem with eclipse it would be at any circumstance. – Mehraban Feb 05 '15 at 21:33
  • so, what you're saying is that replacing the lambda with a plain old anonymous class would work as expected? – Olimpiu POP Feb 06 '15 at 10:13
  • I think this bug is related. If I replaced my lambdas to anonym classes, it worked for me. https://bugs.openjdk.java.net/browse/JDK-8024931 – Gábor Lipták Nov 12 '19 at 13:08

4 Answers4

64

What you have here, is a follow-up error which masks the original error.

When lambda instances are serialized, they use writeReplace to dissolve their JRE specific implementation from the persistent form which is a SerializedLambda instance. When the SerializedLambda instance has been restored, its readResolve method will be invoked to reconstitute the appropriate lambda instance. As the documentation says, it will do so by invoking a special method of the class which defined the original lambda (see also this answer). The important point is that the original class is needed and that’s what’s missing in your case.

But there’s a …special… behavior of the ObjectInputStream. When it encounters an exception, it doesn’t bail out immediately. It will record the exception and continue the process, marking all object being currently read, thus depending on the erroneous object as being erroneous as well. Only at the end of the process it will throw the original exception it encountered. What makes it so strange is that it will also continue trying to set the fields of these object. But when you look at the method ObjectInputStream.readOrdinaryObject line 1806:

…
    if (obj != null &&
        handles.lookupException(passHandle) == null &&
        desc.hasReadResolveMethod())
    {
        Object rep = desc.invokeReadResolve(obj);
        if (unshared && rep.getClass().isArray()) {
            rep = cloneArray(rep);
        }
        if (rep != obj) {
            handles.setObject(passHandle, obj = rep);
        }
    }

    return obj;
}

you see that it doesn’t call the readResolve method when lookupException reports a non-null exception. But when the substitution did not happen, it’s not a good idea to continue trying to set the field values of the referrer but that’s exactly what’s happens here, hence producing a ClassCastException.

You can easily reproduce the problem:

public class Holder implements Serializable {
    Runnable r;
}
public class Defining {
    public static Holder get() {
        final Holder holder = new Holder();
        holder.r=(Runnable&Serializable)()->{};
        return holder;
    }
}
public class Writing {
    static final File f=new File(System.getProperty("java.io.tmpdir"), "x.ser");
    public static void main(String... arg) throws IOException {
        try(FileOutputStream os=new FileOutputStream(f);
            ObjectOutputStream   oos=new ObjectOutputStream(os)) {
            oos.writeObject(Defining.get());
        }
        System.out.println("written to "+f);
    }
}
public class Reading {
    static final File f=new File(System.getProperty("java.io.tmpdir"), "x.ser");
    public static void main(String... arg) throws IOException, ClassNotFoundException {
        try(FileInputStream is=new FileInputStream(f);
            ObjectInputStream ois=new ObjectInputStream(is)) {
            Holder h=(Holder)ois.readObject();
            System.out.println(h.r);
            h.r.run();
        }
        System.out.println("read from "+f);
    }
}

Compile these four classes and run Writing. Then delete the class file Defining.class and run Reading. Then you will get a

Exception in thread "main" java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field test.Holder.r of type java.lang.Runnable in instance of test.Holder
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089)
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)

(Tested with 1.8.0_20)


The bottom line is that you may forget about this Serialization issue once it is understood what’s happening, all you have to do for solving your problem is to make sure that the class which defined the lambda expression is also available in the runtime where the lambda is deserialized.

Example for Spark Job to run directly from IDE (spark-submit distributes jar by default):

SparkConf sconf = new SparkConf()
  .set("spark.eventLog.dir", "hdfs://nn:8020/user/spark/applicationHistory")
  .set("spark.eventLog.enabled", "true")
  .setJars(new String[]{"/path/to/jar/with/your/class.jar"})
  .setMaster("spark://spark.standalone.uri:7077");
Community
  • 1
  • 1
Holger
  • 285,553
  • 42
  • 434
  • 765
  • You said `all you have to do for solving your problem is to make sure that the class which defined the lambda expression is also available in the runtime where the lambda is deserialized.`, Can you tell me ho it can be done. Can you give an example? – ѕтƒ Feb 09 '16 at 12:05
  • 6
    @ѕтƒ Just invoke `setJars(new String[]{"/path/to/jar/with/your/class.jar"})` on your SparkConf instance if you run code from IDE like Idea. `spark-submit` distributes your jar by default, so there're no such issues – morsik Feb 21 '16 at 20:03
  • 1
    @Holger - amazing! :) Out of curiosity, how does it work for lambdas defined in the Spark shell/repl? I imagine the defining class for those is anonymous but somehow they get deserialized too? – George Apr 08 '18 at 18:29
  • 2
    @George I’m not involved in the development of Spark, so I don’t know. But I can imagine that the shell remembers the definitions you’ve entered, which is a transferable form, sufficient to replicate the code on the other side. So only the code of the shell resp. of its backend needs to be available on the other side. – Holger Apr 09 '18 at 06:10
  • Excellent analysis, well done! IMHO that *speshul* behavior in fact secretly swallows exceptions, which is a big fat antipattern and something that definitely shouldn't be lurking in JVM's code. Perhaps we should post a bug report? – Martin Vysny Feb 10 '21 at 08:12
  • 1
    @MartinVysny it’s so long ago, I can’t remember why I didn’t. Perhaps, because I was aware of [this fundamental issue](https://bugs.openjdk.java.net/browse/JDK-8024931) and thought, it’s broken anyway. But since the issue of this answer is avoidable, posting a bug report makes sense. I’ll check whether there is an existing one and file a new one if not. – Holger Feb 11 '21 at 16:15
4

I had the same error and I replaced the lambda with an inner class, then it worked. I don't really understand why, and reproducing this error was extremely difficult (we had one server which exhibited the behavior, and nowhere else).

Doesn't work

this.variable = () -> { ..... }

Yields java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field MyObject.val$variable

Works

this.variable = new MyInterface() {
    public void myMethod() {
       .....
    }
};
Adrian Smith
  • 17,236
  • 11
  • 71
  • 93
  • same here, replaced with an anon clase and works, i dont have problems with the jar because jar was sended to spark in configuration, dont know whats is causing this behavior with lambdas – Danilow Jun 14 '17 at 04:25
  • in my case is very weird, because only dont work with a specific lambda of type FilterFunction, other lambdas in the same class works fine – Danilow Jun 14 '17 at 15:40
3

I suppose your problem is failed auto-boxing. In the code

x -> {
      return true;
}

you pass (String->boolean) lambda (it is Predicate<String>) while filter method takes (String->Boolean) lambda (it is Function<String,Boolean>). So I offer you to change code to

x -> {
      return Boolean.TRUE;
}

Include details into your question please. Output from uname -a and java -version is appreciated. Provide sscce if possible.

Sergey Fedorov
  • 2,169
  • 1
  • 15
  • 26
  • I think you miss the note that "If I put it inside a simple jar rather than a web service (somehow a servlet) it will run without any error". – Mehraban Feb 06 '15 at 11:47
  • @SAM I've read this part. I supposed this is either error in servlet or or java. I offer you to change the code *not to fix problem in your code* but *to workaround environment problem*. – Sergey Fedorov Feb 06 '15 at 12:15
0

You can maybe more simply remplace your Java8 lambda with a spark.scala.Function

replace

output = rdds.map(x->this.function(x)).collect()

with:

output = rdds.map(new Function<Double,Double>(){

   public Double call(Double x){
       return MyClass.this.function(x);
   }

}).collect();
Nicolas Zozol
  • 6,910
  • 3
  • 50
  • 74