I created a simple Job for Apache Flink that uses the PageRank implementation provided with Gelly.
Locally, running inside the IDE, everything is fine. However, I tried to submit a JAR with my Job to a Flink instance running in my machine, using the JobManager web interface. But, instead of getting the correct plan for the Job and executing PageRank, Flink presents and executes a very strange plan that only counts the number of vertices of the graph.
I did some research and debugging, and found out that the implementation of PageRank provided with Gelly starts calculating the number of vertices of the graph, when it's not provided as a parameter to the algorithm:
if (numberOfVertices == 0) {
numberOfVertices = network.numberOfVertices();
}
This calculation implies an embedded job. As the operators are lazy, no computation is triggered. In the Flink server, the first thing done is to obtain the job plan. This is done by a special environment, OptimizerPlanEnvironment
, that provides the following result
method:
public JobExecutionResult execute(String jobName) throws Exception {
Plan plan = createProgramPlan(jobName);
this.optimizerPlan = compiler.compile(plan);
// do not go on with anything now!
throw new ProgramAbortException();
}
The issue comes from here. As soon as the ProgramAbortException
is thrown, the program returns the plan calculated so far. But only the inside job plan has been computed, so this way the main job plan is never computed or executed.
This is the code I used:
public class Job {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Double, Double> graph = Graph.fromDataSet(
PageRankData.getDefaultEdgeDataSet(env), new VertexInit(), env);
graph.run(new PageRank<Long>(0.85, 10)).print();
}
private static class VertexInit implements MapFunction<Long, Double> {
@Override
public Double map(Long value) throws Exception { return 1.0; }
}
}
If the number of vertices is provided, doing e.g. graph.run(new PageRank<Long>(0.85, 5, 10))
, there is no problem, the plan is correctly computed and PageRank is calculated.
My question is: what am I doing wrong? Or is this some actual bug in Flink?