Размер контрольных точек Flink превышает 20 ГБ, а время контрольных точек занимает более 1 минуты.

В первую очередь:

  • Я как бы новичок в Flink (понимаю принцип и могу создать любую базовую потоковую работу, которая мне нужна)
  • Я использую Kinesis Analytics для запуска своего задания Flink, и по умолчанию он использует инкрементную контрольную точку с интервалом в 1 минуту.
  • Задание Flink считывает событие из потока Kinesis с помощью FlinkKinesisConsumer и настраиваемого deserailzer (десериализация байта в простой объект Java, который используется во всем задании)

Я хотел бы просто подсчитать, сколько событий ENTITY_ID / FOO и ENTITY_ID / BAR произошло за последние 24 часа. Важно, чтобы этот подсчет был как можно более точным, и именно поэтому я использую эту функцию Flink вместо того, чтобы сам подсчитывать текущую сумму в 5-минутном окне кувырка. Я также хочу иметь возможность иметь счетчик событий TOTAL с самого начала (а не только за последние 24 часа), поэтому я также выводю в результате количество событий за последние 5 минут, чтобы приложение обработки сообщений могло просто берет эти 5 минут данных и вычисляет текущую сумму. (Этот подсчет не обязательно должен быть точным, и это нормально, если произойдет отключение, и я потеряю счет)

Эта работа хорошо работала до прошлой недели, когда у нас был всплеск (в 10 раз больше) трафика. С этого момента Flink стал бананом. Размер контрольной точки начал медленно расти с ~ 500 МБ до 20 ГБ, а время контрольной точки занимало около 1 минуты и со временем увеличивалось. Приложение начало давать сбой и так и не смогло полностью восстановиться, а возрастающий возраст итератора событий никогда не снижался, поэтому новые события не использовались.

Поскольку я новичок в Flink, я не совсем уверен, что способ, которым я делаю скользящий подсчет, полностью не оптимизирован или просто неверен.

Это небольшой фрагмент ключевой части кода:

Источник (MyJsonDeserializationSchema расширяет AbstractDeserializationSchema и просто считывает байт и создает объект Event):

SourceFunction<Event> source =
      new FlinkKinesisConsumer<>("input-kinesis-stream", new MyJsonDeserializationSchema(), kinesisConsumerConfig);

Событие ввода, простое java pojo, которое будет использоваться в операторах Flink:

public class Event implements Serializable {
  public String entityId;
  public String entityType;
  public String entityName;
  public long eventTimestamp = System.currentTimeMillis();
}

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Event> eventsStream = kinesis
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(30)) {
        @Override
        public long extractTimestamp(Event event) {
          return event.eventTimestamp;
        }
      })

DataStream<Event> fooStream = eventsStream
      .filter(new FilterFunction<Event>() {
        @Override
        public boolean filter(Event event) throws Exception {
          return "foo".equalsIgnoreCase(event.entityType);
        }
      })

 DataStream<Event> barStream = eventsStream
      .filter(new FilterFunction<Event>() {
        @Override
        public boolean filter(Event event) throws Exception {
          return "bar".equalsIgnoreCase(event.entityType);
        }
      })


StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    Table fooTable = tEnv.fromDataStream("fooStream, entityId, entityName, entityType, eventTimestame.rowtime");
    tEnv.registerTable("Foo", fooTable);
    Table barTable = tEnv.fromDataStream("barStream, entityId, entityName, entityType, eventTimestame.rowtime");
    tEnv.registerTable("Bar", barTable);

Table slidingFooCountTable = fooTable
      .window(Slide.over("24.hour").every("5.minute").on("eventTimestamp").as("minuteWindow"))
      .groupBy("entityId, entityName, minuteWindow")
      .select("concat(concat(entityId,'_'), entityName) as slidingFooId, entityid as slidingFooEntityid, entityName as slidingFooEntityName, entityType.count as slidingFooCount, minuteWindow.rowtime as slidingFooMinute");

Table slidingBarCountTable = barTable
      .window(Slide.over("24.hout").every("5.minute").on("eventTimestamp").as("minuteWindow"))
      .groupBy("entityId, entityName, minuteWindow")
      .select("concat(concat(entityId,'_'), entityName) as slidingBarId, entityid as slidingBarEntityid, entityName as slidingBarEntityName, entityType.count as slidingBarCount, minuteWindow.rowtime as slidingBarMinute");

    Table tumblingFooCountTable = fooTable
      .window(Tumble.over(tumblingWindowTime).on("eventTimestamp").as("minuteWindow"))
      .groupBy("entityid, entityName, minuteWindow")
      .select("concat(concat(entityName,'_'), entityName) as tumblingFooId, entityId as tumblingFooEntityId, entityNamae as tumblingFooEntityName, entityType.count as tumblingFooCount, minuteWindow.rowtime as tumblingFooMinute");
   
    Table tumblingBarCountTable = barTable
      .window(Tumble.over(tumblingWindowTime).on("eventTimestamp").as("minuteWindow"))
      .groupBy("entityid, entityName, minuteWindow")
      .select("concat(concat(entityName,'_'), entityName) as tumblingBarId, entityId as tumblingBarEntityId, entityNamae as tumblingBarEntityName, entityType.count as tumblingBarCount, minuteWindow.rowtime as tumblingBarMinute");

    Table aggregatedTable = slidingFooCountTable
      .leftOuterJoin(slidingBarCountTable, "slidingFooId = slidingBarId && slidingFooMinute = slidingBarMinute")
      .leftOuterJoin(tumblingFooCountTable, "slidingFooId = tumblingBarId && slidingFooMinute = tumblingBarMinute")
      .leftOuterJoin(tumblingFooCountTable, "slidingFooId = tumblingFooId && slidingFooMinute = tumblingFooMinute")
      .select("slidingFooMinute as timestamp, slidingFooCreativeId as entityId, slidingFooEntityName as entityName, slidingFooCount, slidingBarCount, tumblingFooCount, tumblingBarCount");

    DataStream<Result> result = tEnv.toAppendStream(aggregatedTable, Result.class);
    result.addSink(sink); // write to an output stream to be picked up by a lambda function

Я был бы очень признателен, если бы кто-нибудь с большим опытом работы с Flink прокомментировал мой способ подсчета? Мой код полностью перестроен? Есть ли лучший и более эффективный способ подсчета событий за 24-часовой период?

Я где-то читал в Stackoverflow @DavidAnderson, где предлагалось создать собственное скользящее окно, используя состояние карты и разрезая событие по метке времени. Однако я не совсем уверен, что это означает, и я не нашел ни одного примера кода, чтобы показать это.


person Marco    schedule 11.10.2020    source источник
comment
Похоже, хороший вопрос для форума разработчиков AWS или службы поддержки AWS. Имхо, возможно, вы захотите присоединиться к слишком большому количеству событий из одного окна, это может создать слишком большое состояние или исчерпать память. Это всего лишь возможность (без журналов доказательств нет), и я не уверен, что может быть хорошим решением.   -  person gusto2    schedule 12.10.2020
comment
AWS также участвует в этом, но я подозреваю, что проблема заключается в устранении проблемы. Если посмотреть на показатели, процессор и память кучи, то на самом деле он никогда не превышает 50%. Это может быть проблема, да, но гораздо больше шансов, что проблема в моей работе   -  person Marco    schedule 12.10.2020


Ответы (1)


Вы создаете там довольно много окон. Если вы создаете скользящее окно размером 24 часа и слайд 5 минут, это означает, что там будет много открытых окон, поэтому вы можете ожидать, что все данные, которые вы получили в данный день, будут отмечены контрольной точкой. хотя бы одно окно, если задуматься. Таким образом, несомненно, что размер и время контрольной точки будут расти по мере роста самих данных.

Чтобы получить ответ, можно ли переписать код, Вам нужно будет предоставить здесь более подробную информацию о том, чего именно Вы пытаетесь достичь.

person Dominik Wosiński    schedule 11.10.2020
comment
Я обновил вопрос, чтобы прояснить конечную цель - person Marco; 12.10.2020
comment
Итак, из того, что Вы говорите, не совсем понятно, почему Вы здесь используете скользящее окно. Если Вы хотите произвести полный подсчет всех прибывших элементов, почему бы Вам просто не пойти с окном вращения в 24 часа? - person Dominik Wosiński; 12.10.2020
comment
Причину этого тоже добавили в пост. Мы не хотим утруждать себя какой-либо сложной пост-обработкой, чтобы создать собственное скользящее окно. Мы хотим воспользоваться преимуществами флинка, который делает его для нас красивым и чистым. Перевернутое окно просто будет использоваться, чтобы более или менее измерить, сколько событий у нас было с начала, нас не волнует, испорчен ли этот счет из-за простоя или ошибок. - person Marco; 12.10.2020
comment
Stream-in - ›Flink -› Stream-out - ›lambda -› Redis Объект Result записывается в Sink, и лямбда подбирает его, чтобы выполнить дополнительную обработку и сохранить в кеше. Для этого мы используем 24-часовой скользящий счетчик. немного статистики для другого приложения. - person Marco; 12.10.2020
comment
Итак, вы хотите, чтобы это постоянно обновлялось (дневная сумма), а не только один раз в день, верно? - person Dominik Wosiński; 12.10.2020
comment
Точно, мы хотим, чтобы кеш обновлялся каждые 5 минут с учетом количества за последние 24 часа (отсюда скользящее окно 24 часа с 5-минутным слайдом) - person Marco; 12.10.2020