2

I have years experience of Java 8 and its lambda. But I met a insane problem when I developed a hello-world-size Spark program.

Here I have a Java class, in which the Data annotation is from Lombok:

@Data
public class Person implements Serializable {
  private String name;
  private Long age;
}

And then I built a java list containing objects of Persion class:

        Person p1 = new Person("sb", 1L);
        Person p2 = new Person("sth", null);
        List<Person> list = new ArrayList<>(2);
        list.add(p1);
        list.add(p2);

so good so far. And then I tried to generate a Spark Dataset using the list:

SparkSession session = SparkSession.builder().master("local[1]").appName("SparkSqlApp").getOrCreate();
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> dataset1 = session.createDataset(list, personEncoder);
dataset1.foreach(new ForeachFunction<Person>() { // 1
            @Override
            public void call(Person person) throws Exception {
                System.out.println(person);
            }
});
dataset1.foreach((ForeachFunction<Person>) System.out::println); //2

Notice that, the block 1 is equivalent to block 2 in java and the block 2 is simplified from block 1 by IntelliJ IDEA. The only difference is block 2 is using lambda expression.

However, when I execute the program, block 1 ends well while block 2 run in exception: enter image description here

What the... big earth and big universe? Why the JVM or Spark engine does things like this?!

Sheldon Wei
  • 1,198
  • 16
  • 31
  • 1
    This is interesting. I usually see the opposite behavior -- block 1 fails, but block 2 succeeds. The problem regardless is that the function is being defined as an object which refers to the outer class. That is, the class which contains the code that got the session and built the dataset is the outer class. It's not serializable, nor should it be. – boneill Nov 24 '21 at 03:34
  • By the way, have you tried with a newer Java version? Perhaps the compiler is a bit "smarter" now and doesn't create an unnecessary reference to the outer class. – boneill Nov 24 '21 at 03:37
  • Yes guys it comes with the `capture` thing. I read the generated code by adding VM argument `-Djdk.internal.lambda.dumpProxyClasses` – Sheldon Wei Nov 25 '21 at 11:32

2 Answers2

7

As explained in What is the equivalent lambda expression for System.out::println, the method reference System.out::println is not identical to the lambda expression x -> System.out.println(x).

The method reference captures the current value of System.out, to invoke println on it each time the function is invoked, rather than evaluating System.out again each time as the lambda expression’s body does.

As also said, this rarely makes a difference, but here, it does. When you try to serialize the function, it will try to serialize all captured values, including the PrintStream instance read from System.out during the instantiation. The PrintStream is not serializable and it would be quite challenging to implement a serializable PrintStream fulfilling the expectations.

But it’s important to keep in mind that when you serialize the lambda expression x -> System.out.println(x) or an equivalent class object and deserialize it in a different environment, the System.out it will read there will evaluate to a different PrintStream than in your original environment. This doesn’t matter when the distributed computing framework takes care to pipe everything printed to the standard output back to the originator.

But it’s important to keep in mind that static fields which are not part of the serialized data may have different contents in different environments in general.

Holger
  • 285,553
  • 42
  • 434
  • 765
  • Sounds like it occurs only with `System.out`?And I replace it with Log framework and bang! It succeeded. `ForeachFunction functionBody = log::info;` – Sheldon Wei Nov 24 '21 at 10:42
  • 1
    Depends on the logging framework. It will work if `log` is serializable. – Holger Nov 24 '21 at 10:46
  • It seems does not relate with the framework. I use `java.util.logging.Logger` which is not serializable. – Sheldon Wei Nov 25 '21 at 02:05
  • 1
    Not for the standard setup: https://ideone.com/F5lQZF “NotSerializableException: java.util.logging.Logger”. However, in a specific environment, a log manager may return a subclass of `Logger` with serialization (or RMI) support, further, the framework could use an extended serialization which can handle loggers in a special way. – Holger Nov 25 '21 at 08:36
1

The interface ForeachFunction extends Serializable. Dataset.foreach(f) may be serializing the argument f. In the following test, testBlock1 succeeds and testBlcok2 fails (NotSerializableException). But I don't know why.

public class AAA implements Serializable {

    @FunctionalInterface
    public interface ForeachFunction<T> extends Serializable {
        void call(T t) throws Exception;
    }

    @Test
    public void testBlock1() throws FileNotFoundException, IOException {
        ForeachFunction<String> functionBody = new ForeachFunction<String>() {
            public void call(String t) throws Exception {
                System.out.println(t);
            }
        };
        try (FileOutputStream fos = new FileOutputStream("data/block1.obj");
            ObjectOutputStream oos = new ObjectOutputStream(fos)) {
            oos.writeObject(functionBody);  // success
        }
    }

    @Test
    public void testBlock2() throws FileNotFoundException, IOException {
        ForeachFunction<String> functionBody = System.out::println;
        try (FileOutputStream fos = new FileOutputStream("data/block2.obj");
            ObjectOutputStream oos = new ObjectOutputStream(fos)) {
            oos.writeObject(functionBody);  // fail (NotSerializableException)
        }
    }
}
  • I tested your cases and indeed, event `functionBody = t -> System.out.println(t)` would be successful. So the problem source I suppose is method reference. You gave me a huge hand. – Sheldon Wei Nov 24 '21 at 07:18
  • If the test class `AAA` does not implement `Serializable` in my code, `testBlock1` will also fail. The `functionBody` in `testBlock1` is an anonymous inner class of the test class `AAA` and should be serialized with an instance of the class `AAA` that encloses it. However, the `functionBody` in `testBlock2` is not an inner class of class `AAA` and does not seem to implement `Serializable` in substance. –  Nov 24 '21 at 07:44