Я создал простую задачу для Apache Flink, которая использует реализацию PageRank, поставляемую с Gelly.
Локально, внутри IDE, все нормально. Однако я попытался отправить JAR с моим заданием в экземпляр Flink, работающий на моем компьютере, с помощью веб-интерфейса JobManager. Но вместо того, чтобы получить правильный план для задания и выполнить PageRank, Flink представляет и выполняет очень странный план, который считает только количество вершин графа.
Я провел небольшое исследование и отладку и обнаружил, что реализация PageRank, поставляемая с Gelly, начинает вычислять количество вершин графа, когда оно не указано в качестве параметра алгоритма:
if (numberOfVertices == 0) {
numberOfVertices = network.numberOfVertices();
}
Этот расчет подразумевает встроенное задание. Поскольку операторы ленивы, вычисления не запускаются. На сервере Flink первое, что нужно сделать, это получить план работы. Это делается с помощью специальной среды OptimizerPlanEnvironment
, которая предоставляет следующий result
метод:
public JobExecutionResult execute(String jobName) throws Exception {
Plan plan = createProgramPlan(jobName);
this.optimizerPlan = compiler.compile(plan);
// do not go on with anything now!
throw new ProgramAbortException();
}
Проблема исходит отсюда. Как только выдается ProgramAbortException
, программа возвращает рассчитанный на данный момент план. Но был рассчитан только внутренний план работ, поэтому таким образом основной план работ никогда не вычисляется и не выполняется.
Это код, который я использовал:
public class Job {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Double, Double> graph = Graph.fromDataSet(
PageRankData.getDefaultEdgeDataSet(env), new VertexInit(), env);
graph.run(new PageRank<Long>(0.85, 10)).print();
}
private static class VertexInit implements MapFunction<Long, Double> {
@Override
public Double map(Long value) throws Exception { return 1.0; }
}
}
Если указано количество вершин, выполняя, например, graph.run(new PageRank<Long>(0.85, 5, 10))
, проблем нет, план рассчитан правильно и подсчитывается PageRank.
У меня вопрос: что я делаю не так? Или это какая-то актуальная ошибка во Flink?