Я хочу сопоставить шаблон CEP в Flink 1.4.0 Streaming со следующим кодом:
DataStream<Event> input = inputFromSocket.map(new IncomingMessageProcessor()).filter(new FilterEmptyAndInvalidEvents());
DataStream<Event> inputFiltered = input.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator());
KeyedStream<Event, String> partitionedInput = inputFiltered.keyBy(new MyKeySelector());
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.where(new ActionCondition("action1"))
.followedBy("middle").where(new ActionCondition("action2"))
.followedBy("end").where(new ActionCondition("action3"));
pattern = pattern.within(Time.seconds(30));
PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);
Event
это просто POJO
public class Event {
private UUID id;
private String action;
private String senderID;
private long occurrenceTimeStamp;
......
}
который извлекается из моего пользовательского источника (Google PubSub). Первый фильтр FilterEmptyAndInvalidEvents()
просто фильтрует события с неправильным форматированием и т. Д., Но в данном случае этого не происходит. Я могу проверить это по выходным данным журнала. Таким образом, каждое событие проходит через метод MyKeySelector.getKey()
.
BoundedOutOfOrdneressGenerator
извлекает только временную метку из одного поля:
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<Event> {
private static Logger LOG = LoggerFactory.getLogger(BoundedOutOfOrdernessGenerator.class);
private final long maxOutOfOrderness = 5500; // 5.5 seconds
private long currentMaxTimestamp;
@Override
public long extractTimestamp(Event element, long previousElementTimestamp) {
long timestamp = element.getOccurrenceTimeStamp();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current highest timestamp minus the out-of-orderness bound
Watermark newWatermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
return newWatermark;
}
}
MyKeySelector
просто извлекает строковое значение из поля:
public class MyKeySelector implements KeySelector<Event, String> {
private static Logger LOG = LoggerFactory.getLogger(MyKeySelector.class);
@Override
public String getKey(Event value) throws Exception {
String senderID = value.getSenderID();
LOG.info("Partioning event {} by key {}", value, senderID);
return senderID;
}
}
ActionCondition
здесь просто сравнивает одно поле в событиях и выглядит так:
public class ActionCondition extends SimpleCondition<Event> {
private static Logger LOG = LoggerFactory.getLogger(ActionCondition.class);
private String filterForCommand = "";
public ActionCondition(String filterForCommand) {
this.filterForCommand = filterForCommand;
}
@Override
public boolean filter(Event value) throws Exception {
LOG.info("Filtering event for {} action: {}", filterForCommand, value);
if (value == null) {
return false;
}
if (value.getAction() == null) {
return false;
}
if (value.getAction().equals(filterForCommand)) {
LOG.info("It's a hit for the {} action for event {}", filterForCommand, value);
return true;
} else {
LOG.info("It's a miss for the {} action for event {}", filterForCommand, value);
return false;
}
}
}
К сожалению, при запуске задания и отправке событий, которые должны совпадать с шаблоном, они принимаются и разделяются правильно, но шаблон CEP не соответствует.
В качестве примера я отправляю следующие события:
- действие1
- действие2
- действие3
В выводе журнала задания Flink я вижу, что события правильно обрабатываются методом MyKeySelector.getKey()
, поскольку я добавил туда вывод журнала. Таким образом, кажется, что события отображаются в потоке правильно, но, к сожалению, они не соответствуют шаблону.
Выходные данные журнала выглядят следующим образом:
FilterEmptyAndInvalidEvents - Letting event Event::27ef8d25-8c3b-43fc-a228-fa0dda8e564d --- action: start, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448701 through
MyKeySelector - Partioning event Event::27ef8d25-8c3b-43fc-a228-fa0dda8e564d --- action: start, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448701 by key RHHLWUi8sXH33AJIAAAA
FilterEmptyAndInvalidEvents - Letting event Event::18b45a9c-b837-4b61-acf3-0b545a097203 --- action: click, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448702 through
MyKeySelector - Partioning event Event::18b45a9c-b837-4b61-acf3-0b545a097203 --- action: click, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448702 by key RHHLWUi8sXH33AJIAAAA
FilterEmptyAndInvalidEvents - Letting event Event::fe1486ab-d702-421d-be32-98dd38a1d306 --- action: connect, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448703 through
MyKeySelector - Partioning event Event::fe1486ab-d702-421d-be32-98dd38a1d306 --- action: connect, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448703 by key RHHLWUi8sXH33AJIAAAA
MyKeySelector - Partioning event Event::27ef8d25-8c3b-43fc-a228-fa0dda8e564d --- action: start, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448701 by key RHHLWUi8sXH33AJIAAAA
MyKeySelector - Partioning event Event::18b45a9c-b837-4b61-acf3-0b545a097203 --- action: click, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448702 by key RHHLWUi8sXH33AJIAAAA
MyKeySelector - Partioning event Event::fe1486ab-d702-421d-be32-98dd38a1d306 --- action: connect, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448703 by key RHHLWUi8sXH33AJIAAAA
TimeCharacteristic установлен на EventTime через
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
и события содержат правильную метку времени.
Если я сейчас отправлю еще 3 события с действиями (но с новой меткой времени и т. Д.)
- действие1
- действие2
- действие3
шаблон соответствует первому набору событий. Я знаю, что он соответствует первому набору событий, поскольку для целей отладки я пометил каждое событие идентификатором guid и распечатал его для сопоставленного.
При отправке 3-го, 4-го, ... набора из этих 3 событий всегда выполняется сопоставление предыдущего набора событий. Так что, похоже, есть своего рода «смещение» в обнаружении паттернов. Однако это не похоже на проблему времени, поскольку первый набор событий также не совпадает, если я жду долго после его отправки (и вижу, что события разделяются с помощью Flink).
Что-то не так с моим кодом или почему мигание всегда соответствует только предыдущему набору событий с шаблоном?