Я использую flink 1.9 и REST API /jobs/:jobid/savepoints
для запуска точки сохранения и отмены задания (корректно остановите задание для последующего запуска из точки сохранения).
Я использую двухфазную фиксацию в исходной функции, поэтому мой источник реализует как CheckpointedFunction
, так и CheckpointListener
интерфейсы. При snapshotState()
вызове метода я делаю снимок внутреннего состояния и notifyCheckpointComplete()
состояние контрольной точки в стороннюю систему.
Из того, что я вижу из исходного кода, только часть snapshotState()
синхронна в CheckpointCoordinator
-
// send the messages to the tasks that trigger their checkpoint
for (Execution execution: executions) {
if (props.isSynchronous()) {
execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
} else {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
}
Подтверждение контрольной точки и уведомление о завершении являются асинхронными в AsyncCheckpointRunnable
.
При этом, когда запускается savepoint
с cancel-job
, установленным на true
, после создания моментального снимка некоторые диспетчеры задач продолжают получать уведомление о завершении до отмены задания и выполнения notifyCheckpointComplete()
, а некоторые нет.
Вопрос в том, есть ли способ отменить задание с помощью точки сохранения, чтобы notifyCheckpointComplete()
гарантированно вызывался всеми диспетчерами задач до отмены задания, или в настоящий момент нет способа добиться этого?