Apache Flink создает неверный план

Я создал простую задачу для Apache Flink, которая использует реализацию PageRank, поставляемую с Gelly.

Локально, внутри IDE, все нормально. Однако я попытался отправить JAR с моим заданием в экземпляр Flink, работающий на моем компьютере, с помощью веб-интерфейса JobManager. Но вместо того, чтобы получить правильный план для задания и выполнить PageRank, Flink представляет и выполняет очень странный план, который считает только количество вершин графа.

Я провел небольшое исследование и отладку и обнаружил, что реализация PageRank, поставляемая с Gelly, начинает вычислять количество вершин графа, когда оно не указано в качестве параметра алгоритма:

if (numberOfVertices == 0) {
    numberOfVertices = network.numberOfVertices();
}

Этот расчет подразумевает встроенное задание. Поскольку операторы ленивы, вычисления не запускаются. На сервере Flink первое, что нужно сделать, это получить план работы. Это делается с помощью специальной среды OptimizerPlanEnvironment, которая предоставляет следующий result метод:

public JobExecutionResult execute(String jobName) throws Exception {
    Plan plan = createProgramPlan(jobName);
    this.optimizerPlan = compiler.compile(plan);

    // do not go on with anything now!
    throw new ProgramAbortException();
}

Проблема исходит отсюда. Как только выдается ProgramAbortException, программа возвращает рассчитанный на данный момент план. Но был рассчитан только внутренний план работ, поэтому таким образом основной план работ никогда не вычисляется и не выполняется.

Это код, который я использовал:

public class Job {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        Graph<Long, Double, Double> graph = Graph.fromDataSet(
            PageRankData.getDefaultEdgeDataSet(env), new VertexInit(), env);
        graph.run(new PageRank<Long>(0.85, 10)).print();
    }

    private static class VertexInit implements MapFunction<Long, Double> {
        @Override
        public Double map(Long value) throws Exception { return 1.0; }
    }
}

Если указано количество вершин, выполняя, например, graph.run(new PageRank<Long>(0.85, 5, 10)), проблем нет, план рассчитан правильно и подсчитывается PageRank.

У меня вопрос: что я делаю не так? Или это какая-то актуальная ошибка во Flink?


person Renato Rosa    schedule 14.03.2016    source источник
comment
Это вопрос о нарушенной визуализации плана или невыполнении программы? Правильно ли выполняется программа при выполнении JAR на работающем экземпляре или это тоже не удается?   -  person Fabian Hueske    schedule 14.03.2016
comment
Это тоже не удается. Визуализируется план, который выполняется.   -  person Renato Rosa    schedule 15.03.2016


Ответы (1)


Проблема, как вы заявили, в том, что network.numberOfVertices внутренне вызывает count в наборе данных вершин. Это запускает независимое задание Flink, которое вычисляет значение счетчика. Это значение обычно извлекается методом main. Однако в случае отправки веб-клиента это не сработает из-за OptimizerPlanEnvironment, который позволяет скомпилировать только одно задание Flink. Поведение аналогично режиму отсоединенного выполнения, который также не поддерживает выполнение активного плана.

На данный момент это ограничение веб-клиента Flink. Причина такого поведения в том, что Flink не хочет блокировать поток обработчика канала Netty, который должен был бы дождаться результата операции count. Операция блокировки приведет к истощению пула потоков и сделает веб-интерфейс для этого сеанса не отвечающим, пока он не будет разблокирован.

person Till Rohrmann    schedule 14.03.2016
comment
Итак, при использовании интерфейса командной строки такой проблемы возникнуть не должно, не так ли? Я сейчас это проверяю. - person Renato Rosa; 15.03.2016
comment
Да, он не должен подвести. Только если вы отправляете задание в отдельном режиме. Но тогда вы должны получить исключение, сообщающее вам именно об этом. - person Till Rohrmann; 15.03.2016