Почему flink не может восстановиться из точки сохранения

версия flink 1.7

Я пытаюсь восстановить задание flink из точки сохранения (или контрольной точки), то что делает задание, это чтение из kafka - ›выполнение агрегации 30-минутного окна (как счетчик) -› погружение в kafka.

Я использую RockDB и включенную контрольную точку.

теперь я пытаюсь запустить точку сохранения вручную. ожидаемое значение каждого агрегированного - 30 (1 данные в минуту). но когда я восстанавливаюсь из точки сохранения (flink run -d -s {url}), агрегированное значение не равно 30 (меньше 30, зависит от времени, когда я отменяю задание flink и восстанавливаю). когда задание выполняется нормально, оно получает 30.

я не знаю, почему некоторые данные могут быть потеряны?

и журнал показывает состояние Нет восстановления для FlinkKafkaConsumer

основной код:

        source.flatMap(new FlatMapFunction<String, Model>() {
        private static final long serialVersionUID = 5814342517597371470L;

        @Override
        public void flatMap(String value, Collector<Model> out) throws Exception {
            LOGGER.info("----> catch value: " + value);
            Model model =  JSONObject.parseObject(value, Model.class);
            out.collect(model);
        }
    }).uid("flatmap-1").name("flatmap-1").assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Model>() {

        private static final long serialVersionUID = -1742071229344039681L;

        @Override
        public long extractTimestamp(Model element, long previousElementTimestamp) {
            return element.getTime();
        }

        @Nullable
        @Override
        public Watermark checkAndGetNextWatermark(Model lastElement, long extractedTimestamp) {
            return new Watermark(extractedTimestamp);
        }
    }).setParallelism(1).keyBy(Model::getDim).window(new DynamicWindowAssigner()).aggregate(new AggregateFunction<Model, Model, Model>() {
        @Override
        public Model createAccumulator() {
            return new Model();
        }

        @Override
        public Model add(Model value, Model accumulator) {
            init(value, accumulator);
            accumulator.setValue(accumulator.getValue() + 1);
            return accumulator;
        }

        @Override
        public Model getResult(Model accumulator) {
            return accumulator;
        }

        @Override
        public Model merge(Model a, Model b) {
            return null;
        }

        private void init(Model value, Model accumulator){
            if(accumulator.getTime() == 0L){
                accumulator.setValue(0);
                accumulator.setDim(value.getDim());
                accumulator.setTime(value.getTime());
            }
        }
    }).uid("agg-1").name("agg-1").map(new MapFunction<Model, String>() {
        private static final long serialVersionUID = -1742071229344039681L;

        @Override
        public String map(Model value) throws Exception {
            value.setTime(TimeWindow.getWindowStartWithOffset(value.getTime(), 0, TimeUnit.MINUTES.toMillis(30)));
            return JSONObject.toJSONString(value);
        }
    }).uid("flatmap-2").name("flatmap-2").setParallelism(4).addSink(metricProducer).uid("sink").name("sink").setParallelism(2);

настройки контрольной точки:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.enableCheckpointing(60000);
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().setCheckpointTimeout(120000);
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(50000);
    StateBackend stateBackend = new RocksDBStateBackend(${path}, true);
    env.setStateBackend(stateBackend);
    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.getConfig().disableSysoutLogging();

person fightchwang    schedule 26.05.2021    source источник
comment
Состояние восстановления не регистрируется только тогда, когда контрольная точка или точка сохранения не используются для инициализации состояния задания, что объясняет, почему вы видите неверные результаты. Я не вижу ничего явно неправильного в том, чем вы поделились, но, возможно, если вы поделитесь более подробной информацией, проблема станет очевидной. (Кроме того, вы используете Flink 1.7.2 или более раннюю версию?)   -  person David Anderson    schedule 26.05.2021
comment
@ Дэвид Андерсон спасибо, я использую v1.7.1   -  person fightchwang    schedule 27.05.2021
comment
@David Anderson и я проверили каталог hdfs точки сохранения, кажется, что он содержит _meta и другие каталоги (которые сохраняют состояние потока и смещение kafka?)   -  person fightchwang    schedule 27.05.2021
comment
код был обновлен, и кластер flink состоит из 3 мастеров, нескольких подчиненных узлов, время выполнения flink - flink-dist_2.11-1.7.1.jar   -  person fightchwang    schedule 27.05.2021


Ответы (1)


наконец, оказывается, что я должен использовать flink run -s {savepoint} -d xxx.jar вместо flink run -d xxx.jar -s {savepoint}, если флаг -d стоит перед флагом -s, затем игнорировать flink - как-то

person fightchwang    schedule 28.05.2021