I'm trying to run the Label propagation protocol on my Apache Flink Gelly Graph.
Here is my code:
Graph<String, Long, String> ugraph = Graph.fromDataSet(vertex, edgeSet, env).getUndirected();
DataSet<Tuple2<String, Long>> idsWithInitialLabels = DataSetUtils
.zipWithUniqueId(graph.getVertexIds())
.map(new MapFunction<Tuple2<Long, String>, Tuple2<String, Long>>() {
public Tuple2<String, Long> map(Tuple2<Long, String> tuple2) throws Exception {
return new Tuple2<String, Long>(tuple2.f1, tuple2.f0);
}
});
DataSet<Vertex<String, Long>> verticesWithCommunity = graph.joinWithVertices(idsWithInitialLabels,
new VertexJoinFunction<Long, Long>() {
public Long vertexJoin(Long vertexValue, Long inputValue) {
return inputValue;
}})
.run(new LabelPropagation<String, Long, String>(10));
I got the following error message:
org.apache.flink.api.common.InvalidProgramException: Object org.apache.flink.graph.Graph$ApplyCoGroupToVertexValues@4dde0543 not serializable at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61) at org.apache.flink.api.java.DataSet.clean(DataSet.java:186) at org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets$CoGroupOperatorSetsPredicate$CoGroupOperatorWithoutFunction.with(CoGroupOperator.java:619) at org.apache.flink.graph.Graph.joinWithVertices(Graph.java:587) at tu.master.ConceptDetection.TextProcessor.clustering(TextProcessor.java:405) at tu.master.ConceptDetection.TextProcessor$4.actionPerformed(TextProcessor.java:210)
Thank you for your help :)