Есть ли способ гарантировать, что все CheckpointListeners уведомлены о завершении контрольной точки на Flink при отмене задания с помощью точки сохранения?

Я использую flink 1.9 и REST API /jobs/:jobid/savepoints для запуска точки сохранения и отмены задания (корректно остановите задание для последующего запуска из точки сохранения).

Я использую двухфазную фиксацию в исходной функции, поэтому мой источник реализует как CheckpointedFunction, так и CheckpointListener интерфейсы. При snapshotState() вызове метода я делаю снимок внутреннего состояния и notifyCheckpointComplete() состояние контрольной точки в стороннюю систему.

Из того, что я вижу из исходного кода, только часть snapshotState() синхронна в CheckpointCoordinator -

// send the messages to the tasks that trigger their checkpoint
                for (Execution execution: executions) {
                    if (props.isSynchronous()) {
                        execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
                    } else {
                        execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
                    }
                }

Подтверждение контрольной точки и уведомление о завершении являются асинхронными в AsyncCheckpointRunnable.

При этом, когда запускается savepoint с cancel-job, установленным на true, после создания моментального снимка некоторые диспетчеры задач продолжают получать уведомление о завершении до отмены задания и выполнения notifyCheckpointComplete(), а некоторые нет.

Вопрос в том, есть ли способ отменить задание с помощью точки сохранения, чтобы notifyCheckpointComplete() гарантированно вызывался всеми диспетчерами задач до отмены задания, или в настоящий момент нет способа добиться этого?


person Mikalai Lushchytski    schedule 07.08.2020    source источник


Ответы (2)


Разве использование остановки с точкой сохранения [1] [2] не решило бы проблему?

[1] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-stop [2] https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html

person Kostas Kloudas    schedule 10.08.2020
comment
По моим наблюдениям, уведомление о завершении контрольной точки вызывается асинхронно, и не все диспетчеры задач получают это уведомление до его отмены. - person Mikalai Lushchytski; 10.08.2020
comment
Я ожидал, что все задачи, реализующие интерфейс CheckpointListener, получат уведомление о завершении контрольной точки в случае stop-wiith-savepoint. Это то, что гарантирует, что все приемники также будут переданы внешним системам, используемым в качестве приемников. Наблюдали ли вы за конкретным типом задач, которого нет? - person Kostas Kloudas; 11.08.2020
comment
да. В исходном коде функция notifyCheckpointComplete иногда не вызывается. - person Mikalai Lushchytski; 11.08.2020
comment
Спасибо, Костас! Ты прав! Я использовал другую конечную точку - точку сохранения с отменой, которая, как мне казалось, похожа на точку остановки с точкой сохранения. Однако копаясь в коде, выясняется, что точка остановки с точкой сохранения является синхронной, а точка сохранения с отменой - асинхронной. Итак, переход на остановку с точкой сохранения решил проблему. - person Mikalai Lushchytski; 12.08.2020
comment
Приятно слышать! - person Kostas Kloudas; 12.08.2020

Прошло много времени с тех пор, как я смотрел Flink 1.9, поэтому, пожалуйста, отнеситесь к моему ответу с некоторой осторожностью.

Я предполагаю, что ваши источники отменяют слишком рано. Таким образом, notifyCheckpointComplete фактически отправляется всем задачам, но некоторые SourceFunction уже вышли из run, и соответствующая задача очищена.

Afaik, то, что вы описали, должно быть возможным, если вы игнорируете отмену и прерывания, пока не получите последний notifyCheckpointComplete.

class YourSource implements SourceFunction<Object>, CheckpointListener, CheckpointedFunction {
    private volatile boolean canceled = false;
    private volatile boolean pendingCheckpoint = false;

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        pendingCheckpoint = true;
        // start two-phase commit
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {

    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        // finish two-phase commit
        pendingCheckpoint = false;
    }

    @Override
    public void run(SourceContext<Object> ctx) throws Exception {
        while (!canceled) {
            // do normal source stuff
        }
        // keep the task running after cancellation
        while (pendingCheckpoint) {
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                // ignore interruptions until two-phase commit is done
            }
        }
    }

    @Override
    public void cancel() {
        canceled = true;
    }
}
person Arvid Heise    schedule 07.08.2020