1

I am seeing a behavior in Spark ( 2.2.0 ) I do not understand, but guessing it's related to Lambda and Anonymous classes, when trying to extract out a lambda function:

This works:

public class EventsFilter
{
    public Dataset< String > filter( Dataset< String > events )
    {
        return events.filter( ( FilterFunction< String > ) x -> x.length() > 3 );
    }
}

Yet this does not:

public class EventsFilter
{
    public Dataset< String > filter( Dataset< String > events )
    {
        FilterFunction< String > filter = new FilterFunction< String >(){
            @Override public boolean call( String value ) throws Exception
            {
                return value.length() > 3;
            }
        };
        return events.filter( filter );
    }
}

org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) ...
...
Caused by: java.io.NotSerializableException: ...EventsFilter
   ..Serialization stack:
- object not serializable (class: ...EventsFilter, 
value:...EventsFilter@e521067)
    - field (class: .EventsFilter$1, name: this$0, type: class ..EventsFilter)
.   - object (class ...EventsFilter$1, ..EventsFilter$1@5c70d7f0)
.   - element of array (index: 1)
    - array (class [Ljava.lang.Object;, size 4)
    - field (class: 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name: references$1, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, <function2>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)

I am testing against:

@Test
public void test()
{
    EventsFilter filter = new EventsFilter();
    Dataset<String> input = SparkSession.builder().appName( "test" ).master( "local" ).getOrCreate()
            .createDataset( Arrays.asList( "123" , "123"  , "3211" ) ,
                Encoders.kryo( String.class ) );

    Dataset<String> res = filter.filter( input );
    assertThat( res.count() , is( 1l ) );
}

Even weirder, when put in a static main, both seem to work...

How is defining the function explicitly inside a method causing that sneaky 'this' reference serialization?

harel
  • 525
  • 5
  • 21

2 Answers2

3

Java's inner classes holds reference to outer class. Your outer class is not serializable, so exception is thrown.

Lambdas does not hold reference if that reference is not used, so there's no problem with non-serializable outer class. More here

T. Gawęda
  • 15,706
  • 4
  • 46
  • 61
  • Which inner class / outer class are you talking about? What's the difference between the lambda and the function variable 'filter' and why is it not a free variable? – harel Sep 17 '17 at 20:00
  • 2
    Outer class i just your main class EventsFilter, inner class = anonymous class. Lamdbas are also using inner classe, but in different way :)I will add more detailed description later, in few hours – T. Gawęda Sep 18 '17 at 02:01
  • 2
    Interesting related topics: “[How will Java lambda functions be compiled?](https://stackoverflow.com/q/16827262/2711488)” and “[Does a lambda expression create an object on the heap every time it's executed?](https://stackoverflow.com/q/27524445/2711488)”… – Holger Sep 18 '17 at 10:11
1

I was under the false impression that Lambdas are implemented under the hood as inner classes. This is no longer the case (very helpful talk). Also, as T. Gawęda answered, inner classes do in fact hold reference to the outer class, even if it is not needed (here). This difference explains the behavior.

harel
  • 525
  • 5
  • 21