В первую очередь:
- Я как бы новичок в Flink (понимаю принцип и могу создать любую базовую потоковую работу, которая мне нужна)
- Я использую Kinesis Analytics для запуска своего задания Flink, и по умолчанию он использует инкрементную контрольную точку с интервалом в 1 минуту.
- Задание Flink считывает событие из потока Kinesis с помощью FlinkKinesisConsumer и настраиваемого deserailzer (десериализация байта в простой объект Java, который используется во всем задании)
Я хотел бы просто подсчитать, сколько событий ENTITY_ID / FOO и ENTITY_ID / BAR произошло за последние 24 часа. Важно, чтобы этот подсчет был как можно более точным, и именно поэтому я использую эту функцию Flink вместо того, чтобы сам подсчитывать текущую сумму в 5-минутном окне кувырка. Я также хочу иметь возможность иметь счетчик событий TOTAL с самого начала (а не только за последние 24 часа), поэтому я также выводю в результате количество событий за последние 5 минут, чтобы приложение обработки сообщений могло просто берет эти 5 минут данных и вычисляет текущую сумму. (Этот подсчет не обязательно должен быть точным, и это нормально, если произойдет отключение, и я потеряю счет)
Эта работа хорошо работала до прошлой недели, когда у нас был всплеск (в 10 раз больше) трафика. С этого момента Flink стал бананом. Размер контрольной точки начал медленно расти с ~ 500 МБ до 20 ГБ, а время контрольной точки занимало около 1 минуты и со временем увеличивалось. Приложение начало давать сбой и так и не смогло полностью восстановиться, а возрастающий возраст итератора событий никогда не снижался, поэтому новые события не использовались.
Поскольку я новичок в Flink, я не совсем уверен, что способ, которым я делаю скользящий подсчет, полностью не оптимизирован или просто неверен.
Это небольшой фрагмент ключевой части кода:
Источник (MyJsonDeserializationSchema расширяет AbstractDeserializationSchema и просто считывает байт и создает объект Event):
SourceFunction<Event> source =
new FlinkKinesisConsumer<>("input-kinesis-stream", new MyJsonDeserializationSchema(), kinesisConsumerConfig);
Событие ввода, простое java pojo, которое будет использоваться в операторах Flink:
public class Event implements Serializable {
public String entityId;
public String entityType;
public String entityName;
public long eventTimestamp = System.currentTimeMillis();
}
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Event> eventsStream = kinesis
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(30)) {
@Override
public long extractTimestamp(Event event) {
return event.eventTimestamp;
}
})
DataStream<Event> fooStream = eventsStream
.filter(new FilterFunction<Event>() {
@Override
public boolean filter(Event event) throws Exception {
return "foo".equalsIgnoreCase(event.entityType);
}
})
DataStream<Event> barStream = eventsStream
.filter(new FilterFunction<Event>() {
@Override
public boolean filter(Event event) throws Exception {
return "bar".equalsIgnoreCase(event.entityType);
}
})
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Table fooTable = tEnv.fromDataStream("fooStream, entityId, entityName, entityType, eventTimestame.rowtime");
tEnv.registerTable("Foo", fooTable);
Table barTable = tEnv.fromDataStream("barStream, entityId, entityName, entityType, eventTimestame.rowtime");
tEnv.registerTable("Bar", barTable);
Table slidingFooCountTable = fooTable
.window(Slide.over("24.hour").every("5.minute").on("eventTimestamp").as("minuteWindow"))
.groupBy("entityId, entityName, minuteWindow")
.select("concat(concat(entityId,'_'), entityName) as slidingFooId, entityid as slidingFooEntityid, entityName as slidingFooEntityName, entityType.count as slidingFooCount, minuteWindow.rowtime as slidingFooMinute");
Table slidingBarCountTable = barTable
.window(Slide.over("24.hout").every("5.minute").on("eventTimestamp").as("minuteWindow"))
.groupBy("entityId, entityName, minuteWindow")
.select("concat(concat(entityId,'_'), entityName) as slidingBarId, entityid as slidingBarEntityid, entityName as slidingBarEntityName, entityType.count as slidingBarCount, minuteWindow.rowtime as slidingBarMinute");
Table tumblingFooCountTable = fooTable
.window(Tumble.over(tumblingWindowTime).on("eventTimestamp").as("minuteWindow"))
.groupBy("entityid, entityName, minuteWindow")
.select("concat(concat(entityName,'_'), entityName) as tumblingFooId, entityId as tumblingFooEntityId, entityNamae as tumblingFooEntityName, entityType.count as tumblingFooCount, minuteWindow.rowtime as tumblingFooMinute");
Table tumblingBarCountTable = barTable
.window(Tumble.over(tumblingWindowTime).on("eventTimestamp").as("minuteWindow"))
.groupBy("entityid, entityName, minuteWindow")
.select("concat(concat(entityName,'_'), entityName) as tumblingBarId, entityId as tumblingBarEntityId, entityNamae as tumblingBarEntityName, entityType.count as tumblingBarCount, minuteWindow.rowtime as tumblingBarMinute");
Table aggregatedTable = slidingFooCountTable
.leftOuterJoin(slidingBarCountTable, "slidingFooId = slidingBarId && slidingFooMinute = slidingBarMinute")
.leftOuterJoin(tumblingFooCountTable, "slidingFooId = tumblingBarId && slidingFooMinute = tumblingBarMinute")
.leftOuterJoin(tumblingFooCountTable, "slidingFooId = tumblingFooId && slidingFooMinute = tumblingFooMinute")
.select("slidingFooMinute as timestamp, slidingFooCreativeId as entityId, slidingFooEntityName as entityName, slidingFooCount, slidingBarCount, tumblingFooCount, tumblingBarCount");
DataStream<Result> result = tEnv.toAppendStream(aggregatedTable, Result.class);
result.addSink(sink); // write to an output stream to be picked up by a lambda function
Я был бы очень признателен, если бы кто-нибудь с большим опытом работы с Flink прокомментировал мой способ подсчета? Мой код полностью перестроен? Есть ли лучший и более эффективный способ подсчета событий за 24-часовой период?
Я где-то читал в Stackoverflow @DavidAnderson, где предлагалось создать собственное скользящее окно, используя состояние карты и разрезая событие по метке времени. Однако я не совсем уверен, что это означает, и я не нашел ни одного примера кода, чтобы показать это.