1

I'm trying to run the Label propagation protocol on my Apache Flink Gelly Graph.
Here is my code:

        Graph<String, Long, String> ugraph = Graph.fromDataSet(vertex, edgeSet, env).getUndirected();
        DataSet<Tuple2<String, Long>> idsWithInitialLabels = DataSetUtils
            .zipWithUniqueId(graph.getVertexIds())
            .map(new MapFunction<Tuple2<Long, String>, Tuple2<String, Long>>() {
                public Tuple2<String, Long> map(Tuple2<Long, String> tuple2) throws Exception {
                    return new Tuple2<String, Long>(tuple2.f1, tuple2.f0);
                }
            }); 
        DataSet<Vertex<String, Long>> verticesWithCommunity = graph.joinWithVertices(idsWithInitialLabels,
            new VertexJoinFunction<Long, Long>() {
            public Long vertexJoin(Long vertexValue, Long inputValue) {
            return inputValue;
        }})
    .run(new LabelPropagation<String, Long, String>(10)); 

I got the following error message:

org.apache.flink.api.common.InvalidProgramException: Object org.apache.flink.graph.Graph$ApplyCoGroupToVertexValues@4dde0543 not serializable at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61) at org.apache.flink.api.java.DataSet.clean(DataSet.java:186) at org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets$CoGroupOperatorSetsPredicate$CoGroupOperatorWithoutFunction.with(CoGroupOperator.java:619) at org.apache.flink.graph.Graph.joinWithVertices(Graph.java:587) at tu.master.ConceptDetection.TextProcessor.clustering(TextProcessor.java:405) at tu.master.ConceptDetection.TextProcessor$4.actionPerformed(TextProcessor.java:210)

Thank you for your help :)

  • Hi Nesrine, I've tried reproducing your problem but I couldn't. Which Flink version are you using? Also a tip: you're creating your graph and then zipping your vertices with unique ids and joining back with the original vertex set. You can avoid the join by zipping your vertices before creating the graph and then use the initialized vertex set as an argument to `fromDataSet(...)`. – vasia Jul 26 '16 at 22:31
  • H Vasia, thank you for your response. The problem was solved by using a lambda expression instead of: ` new VertexJoinFunction() { public Long vertexJoin(Long vertexValue, Long inputValue) { return inputValue; }}` – Nesrine Doghri Jul 28 '16 at 13:12

1 Answers1

0

I'm guessing that the class that contains your graph code is not Serializable. Anonymous classes in Java are really non-static inner classes, meaning they have a reference to the containing class's this (see this answer). If the containing class is not Serializable, the this reference won't serialize and neither will the anonymous class.

That would explain why switching to a lambda expression would make it serialize. Lambda expressions are not anonymous classes, so they don't automatically capture an implicit this reference.

What it doesn't explain is why declaring your MapFunction as an anonymous class still works. If you still have this code, @Nesrine, I'd be curious what the whole class looks like.

meustrus
  • 6,637
  • 5
  • 42
  • 53