Шаблон Flink CEP не соответствует первым событиям после запуска задания и всегда соответствует предыдущим установленным событиям

Я хочу сопоставить шаблон CEP в Flink 1.4.0 Streaming со следующим кодом:

    DataStream<Event> input = inputFromSocket.map(new IncomingMessageProcessor()).filter(new FilterEmptyAndInvalidEvents());

    DataStream<Event> inputFiltered = input.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator());
    KeyedStream<Event, String> partitionedInput = inputFiltered.keyBy(new MyKeySelector());

    Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
    .where(new ActionCondition("action1"))
    .followedBy("middle").where(new ActionCondition("action2"))
    .followedBy("end").where(new ActionCondition("action3"));

    pattern = pattern.within(Time.seconds(30));

    PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);

Event это просто POJO

public class Event {
    private UUID id;
    private String action;
    private String senderID;
    private long occurrenceTimeStamp;
    ......
}

который извлекается из моего пользовательского источника (Google PubSub). Первый фильтр FilterEmptyAndInvalidEvents() просто фильтрует события с неправильным форматированием и т. Д., Но в данном случае этого не происходит. Я могу проверить это по выходным данным журнала. Таким образом, каждое событие проходит через метод MyKeySelector.getKey().

BoundedOutOfOrdneressGenerator извлекает только временную метку из одного поля:

public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<Event> {
    private static Logger LOG = LoggerFactory.getLogger(BoundedOutOfOrdernessGenerator.class);
    private final long maxOutOfOrderness = 5500; // 5.5 seconds

    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(Event element, long previousElementTimestamp) {
        long timestamp = element.getOccurrenceTimeStamp();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        Watermark newWatermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        return newWatermark;
    }
}

MyKeySelector просто извлекает строковое значение из поля:

public class MyKeySelector implements KeySelector<Event, String> {
    private static Logger LOG = LoggerFactory.getLogger(MyKeySelector.class);

    @Override
    public String getKey(Event value) throws Exception {
        String senderID = value.getSenderID();
        LOG.info("Partioning event {} by key {}", value, senderID);
        return senderID;
    }
}

ActionCondition здесь просто сравнивает одно поле в событиях и выглядит так:

public class ActionCondition extends SimpleCondition<Event> {
    private static Logger LOG = LoggerFactory.getLogger(ActionCondition.class);

    private String filterForCommand = "";

    public ActionCondition(String filterForCommand) {
        this.filterForCommand = filterForCommand;
    }

    @Override
    public boolean filter(Event value) throws Exception {
        LOG.info("Filtering event for {} action: {}", filterForCommand, value);

        if (value == null) {
            return false;
        }

        if (value.getAction() == null) {
            return false;
        }

        if (value.getAction().equals(filterForCommand)) {
            LOG.info("It's a hit for the {} action for event {}", filterForCommand, value);
            return true;
        } else {
            LOG.info("It's a miss for the {} action for event {}", filterForCommand, value);
            return false;
        }
    }
}

К сожалению, при запуске задания и отправке событий, которые должны совпадать с шаблоном, они принимаются и разделяются правильно, но шаблон CEP не соответствует.

В качестве примера я отправляю следующие события:

  1. действие1
  2. действие2
  3. действие3

В выводе журнала задания Flink я вижу, что события правильно обрабатываются методом MyKeySelector.getKey(), поскольку я добавил туда вывод журнала. Таким образом, кажется, что события отображаются в потоке правильно, но, к сожалению, они не соответствуют шаблону.

Выходные данные журнала выглядят следующим образом:

FilterEmptyAndInvalidEvents   - Letting event Event::27ef8d25-8c3b-43fc-a228-fa0dda8e564d --- action: start, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448701 through
MyKeySelector  - Partioning event Event::27ef8d25-8c3b-43fc-a228-fa0dda8e564d --- action: start, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448701 by key RHHLWUi8sXH33AJIAAAA
FilterEmptyAndInvalidEvents   - Letting event Event::18b45a9c-b837-4b61-acf3-0b545a097203 --- action: click, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448702 through
MyKeySelector  - Partioning event Event::18b45a9c-b837-4b61-acf3-0b545a097203 --- action: click, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448702 by key RHHLWUi8sXH33AJIAAAA
FilterEmptyAndInvalidEvents   - Letting event Event::fe1486ab-d702-421d-be32-98dd38a1d306 --- action: connect, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448703 through
MyKeySelector  - Partioning event Event::fe1486ab-d702-421d-be32-98dd38a1d306 --- action: connect, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448703 by key RHHLWUi8sXH33AJIAAAA
MyKeySelector  - Partioning event Event::27ef8d25-8c3b-43fc-a228-fa0dda8e564d --- action: start, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448701 by key RHHLWUi8sXH33AJIAAAA
MyKeySelector  - Partioning event Event::18b45a9c-b837-4b61-acf3-0b545a097203 --- action: click, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448702 by key RHHLWUi8sXH33AJIAAAA
MyKeySelector  - Partioning event Event::fe1486ab-d702-421d-be32-98dd38a1d306 --- action: connect, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448703 by key RHHLWUi8sXH33AJIAAAA

TimeCharacteristic установлен на EventTime через

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

и события содержат правильную метку времени.

Если я сейчас отправлю еще 3 события с действиями (но с новой меткой времени и т. Д.)

  1. действие1
  2. действие2
  3. действие3

шаблон соответствует первому набору событий. Я знаю, что он соответствует первому набору событий, поскольку для целей отладки я пометил каждое событие идентификатором guid и распечатал его для сопоставленного.

При отправке 3-го, 4-го, ... набора из этих 3 событий всегда выполняется сопоставление предыдущего набора событий. Так что, похоже, есть своего рода «смещение» в обнаружении паттернов. Однако это не похоже на проблему времени, поскольку первый набор событий также не совпадает, если я жду долго после его отправки (и вижу, что события разделяются с помощью Flink).

Что-то не так с моим кодом или почему мигание всегда соответствует только предыдущему набору событий с шаблоном?


person sceee    schedule 08.02.2018    source источник
comment
Не могли бы вы расширить свой пример метками времени и тем, как вы их извлекаете и генерируете водяной знак? Лучше всего, если бы вы могли предоставить полный минимальный пример, провоцирующий проблему.   -  person Dawid Wysakowicz    schedule 09.02.2018
comment
@DawidWysakowicz Спасибо за ваш ответ, я отредактировал пример, чтобы включить все ключевые части.   -  person sceee    schedule 09.02.2018


Ответы (1)


Я разобрался - я всегда искал источник потоковой передачи, но моя обработка событий на самом деле совершенно нормальная. Проблема заключалась в том, что создание моего водяного знака происходило не непрерывно. Как вы можете видеть в приведенном выше коде, я создал водяной знак только при получении события.

Но после отправки первых трех событий в моей настройке больше не было событий. Поэтому новые водяные знаки не создавались никогда снова.

А поскольку новый водяной знак с отметкой времени, превышающей отметку времени последнего полученного события последовательности, никогда не создавался, Flink никогда не обрабатывал элементы. Причину этого можно найти здесь: Flink CEP - обработка опозданий во время события

Важное предложение:

... и когда появляется водяной знак, обрабатываются все элементы в этом буфере с отметками времени меньше, чем у водяного знака.

Итак, поскольку я создавал водяной знак в BoundedOutOfOrdernessGenerator с задержкой в ​​5,5 секунд, последний водяной знак всегда был за 5,5 секунды до отметки времени последнего события. Следовательно, события никогда не обрабатывались.

Таким образом, одним из решений этого является периодическая генерация водяных знаков, которые предполагают определенную задержку для входящих событий. Для этого нам нужно установить setAutoWatermarkInterval для ExecutionConfig:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
..
ExecutionConfig executionConfig = env.getConfig();
executionConfig.setAutoWatermarkInterval(1000L);

Это позволяет Flink периодически вызывать генератор водяных знаков в заданное время (в данном случае каждую секунду) и запрашивать новый водяной знак.

Кроме того, нам нужно настроить генератор отметок времени / водяных знаков так, чтобы он выдавал новые отметки времени даже без поступления новых событий. Для этого я изменил BoundedOutOfOrdernessTimestampExtractor.

public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<Event> {

    private static final long serialVersionUID = 1L;

    /** The current maximum timestamp seen so far. */
    private long currentMaxTimestamp;

    /** The timestamp of the last emitted watermark. */
    private long lastEmittedWatermark = Long.MIN_VALUE;

    /**
     * The (fixed) interval between the maximum seen timestamp seen in the records
     * and that of the watermark to be emitted.
     */
    private final long maxOutOfOrderness;

    public BoundedOutOfOrdernessGenerator() {
        Time maxOutOfOrderness = Time.seconds(5);

        if (maxOutOfOrderness.toMilliseconds() < 0) {
            throw new RuntimeException("Tried to set the maximum allowed " + "lateness to " + maxOutOfOrderness
                    + ". This parameter cannot be negative.");
        }
        this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
        this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
    }

    public long getMaxOutOfOrdernessInMillis() {
        return maxOutOfOrderness;
    }

    /**
     * Extracts the timestamp from the given element.
     *
     * @param element The element that the timestamp is extracted from.
     * @return The new timestamp.
     */
    public long extractTimestamp(Event element) {
        long timestamp = element.getOccurrenceTimeStamp();
        return timestamp;
    }

    @Override
    public final Watermark getCurrentWatermark() {
        Instant instant = Instant.now();
        long nowTimestampMillis = instant.toEpochMilli();
        long latenessTimestamp = nowTimestampMillis - maxOutOfOrderness;

        if (latenessTimestamp >= currentMaxTimestamp) {
            currentMaxTimestamp = latenessTimestamp;
        }

        // this guarantees that the watermark never goes backwards.
        long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
        if (potentialWM >= lastEmittedWatermark) {
            lastEmittedWatermark = potentialWM;
        }
        return new Watermark(lastEmittedWatermark);
    }

    @Override
    public final long extractTimestamp(Event element, long previousElementTimestamp) {
        long timestamp = extractTimestamp(element);
        if (timestamp > currentMaxTimestamp) {
            currentMaxTimestamp = timestamp;
        }
        return timestamp;
    }
}

Как вы можете видеть в getCurrentWatermark(), я беру метку времени текущей эпохи, вычитаю максимальную задержку, которую мы ожидаем, а затем создаю водяной знак из этой метки времени.

Вместе Flink теперь запрашивает новую метку времени каждую секунду, а водяной знак всегда «отстает» на 5 секунд. Это позволяет сопоставлять события с определенными шаблонами максимум через 5 секунд после получения последнего события.

Если это работает для вашего сценария, зависит от вашего сценария, потому что это также означает, что события, которые старше 5 секунд (на 5 секунд меньше, чем водяной знак) в момент времени, когда они были получены Flink, отбрасываются и больше не обрабатываются.

person sceee    schedule 16.02.2018