Используете окно Flink и функцию сворачивания, элемент отсутствует?

Когда я пытаюсь объединить элементы с помощью функции окна и сворачивания, некоторые элементы не удается объединить. Получение элементов из Kafka (value:0, value:1, value:2, value:3) и их агрегирование как нечетные и четные значения.

Выход:

{even=[0, 2, 4], odd=[1, 3]}
{even=[6, 8], odd=[5, 7, 9]}
{even=[14, 16, 18], odd=[15, 17]}
{even=[20, 22], odd=[19, 21, 23]}
{even=[24, 26, 28], odd=[25, 27]}

Числа между 10-13 отсутствуют, и это происходит для случайного набора чисел. Может ли кто-нибудь подсказать, что упущено в приведенном ниже коде и как я могу быть уверен, что обработаю все элементы?

public static class Splitter implements FlatMapFunction<String, 
    Tuple3<String, String, List<String>>{
    private static final long serialVersionUID = 1L;

    @Override
    public void flatMap(String value, Collector<Tuple3<String, String, 
        List<String>>out) throws Exception {
        String[] vals = value.split(":");

        if(vals.length 1 && Integer.parseInt(vals[1]) % 2 == 0){
            out.collect(new Tuple3<String, String, List<String>>
             ("test","even", Arrays.asList(vals[1])));
        }else{
            out.collect(new Tuple3<String, String, List<String>>
            ("test","odd", Arrays.asList(vals[1])));
        }
    }
}


    DataStream<Map<String, List<String>>streamValue = 
    kafkaStream.flatMap(new Splitter()).keyBy(0)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(3000))).
    trigger(CustomizedCountTrigger.of(5L))//.trigger(CountTrigger.of(2))
    .fold(new HashMap<String, List<String>>(), new 
    FoldFunction<Tuple3<String, String, List<String>>, Map<String, 
    List<String>>>() {
        private static final long serialVersionUID = 1L;

        @Override
        public Map<String, List<String>fold(Map<String, 
        List<String>accumulator,
        Tuple3<String, String, List<String>value) throws 
        Exception {
            if(accumulator.get(value.f1) != null){
                List<Stringlist = new ArrayList<>();
                list.addAll(accumulator.get(value.f1));
                list.addAll(value.f2);
                accumulator.put(value.f1, list);
            }else{
                accumulator.put(value.f1, value.f2);
            }
            return accumulator;
        }
    });

    streamValue.print();
    env.execute("window test");
}


public class CustomizedCountTrigger<W extends Windowextends 
Trigger<Object, W{

    private static final long serialVersionUID = 1L;
    private final long maxCount;

    private final ReducingStateDescriptor<LongstateDesc =
    new ReducingStateDescriptor<>("count", new Sum(), 
    LongSerializer.INSTANCE);

    private CustomizedCountTrigger(long maxCount) {
        this.maxCount = maxCount;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, W window,
    TriggerContext ctx) throws Exception {
        ReducingState<Longcount = ctx.getPartitionedState(stateDesc);
        count.add(1L);
        if (count.get() >= maxCount) {
            count.clear();
            return TriggerResult.FIRE_AND_PURGE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window,

    org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext

    ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, W window,

    org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext

    ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(W window, 
    org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext

    ctx)
    throws Exception {
        ctx.getPartitionedState(stateDesc).clear();
    }

    @Override
    public String toString() {
        return "CountTrigger(" +  maxCount + ")";
    }

    public static <W extends WindowCustomizedCountTrigger<Wof(long 
    maxCount) {
        return new CustomizedCountTrigger<>(maxCount);
    }

    private static class Sum implements ReduceFunction<Long{
        private static final long serialVersionUID = 1L;

        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            return value1 + value2;
        }

    }
}

person Sharath    schedule 18.10.2017    source источник


Ответы (1)


Итак, я начал писать первую часть этого, прежде чем замечать, что ваши настраиваемые триггеры делают тот факт, что вы используете окно TumblingEventTime, неактуальным, но я все равно хочу включить свои исходные мысли, так как я не совсем уверен, почему вы должны использовать окно EventTime, когда вы его не используете. Мой ответ после того, как я понял, что это ниже оригинала.

Вы запускаете это на одном параллелизме или на нескольких? Причина, по которой я спрашиваю, заключается в том, что если это множественный параллелизм (а тема kafka также состоит из нескольких разделов), тогда возможно, что сообщения принимаются и обрабатываются в непоследовательном порядке. Это может привести к появлению сообщений с меткой времени, которая заставляет водяной знак продвигаться вперед, заставляя окно обрабатывать сообщения. Затем следующее сообщение (я) имеет время события, которое предшествует текущему времени водяного знака (также известное как опоздание), и это приведет к тому, что сообщение будет отброшено.

Так, например: если у вас есть 20 элементов, и время события каждого такое:

message1: eventTime: 1000 message1: eventTime: 2000 и т. д.

И ваше временное окно события составляет 5001 мс.

Теперь сообщения message1 - message9 проходят по порядку. Это первое окно будет обработано и будет содержать сообщения 1-5 (message6 вызовет обработку окна). Теперь, если message11 приходит раньше message10, это вызовет обработку окна, содержащего сообщения 6-9. А когда приходит сообщение message10, водяной знак уже прошел мимо времени события message10, что приводит к его удалению как более позднему событию.

Правильный ответ

Вместо использования окна eventTime и настраиваемого триггера попробуйте использовать countWindow.

Так что замените это:

.window(TumblingEventTimeWindows.of(Time.milliseconds(3000))).
trigger(CustomizedCountTrigger.of(5L))//.trigger(CountTrigger.of(2))

С этим:

.countWindow(5L)
person Jicaar    schedule 18.10.2017
comment
Большое спасибо за время и объяснения. Я согласен с использованием eventTimeWindow, что приведет к удалению сообщений. Но мой вариант использования примерно такой, как показано ниже. Перед этим я бы пояснил, сказав, что пробовал использовать как parallelism (1), так и parallelism (2), но проблема осталась прежней, некоторые события пропали. - person Sharath; 19.10.2017
comment
Мой вариант использования - обработка набора событий, когда одна из бизнес-логики оценивается как истинная. например, если общее количество событий больше 3 или общее количество событий с четным числом больше 5 или предварительно определенное временное окно пересекается (скажем, 2 секунды). Кроме того, я понял, что если вы переопределите триггер окна одним из наших, то фактический триггер больше не будет считаться., В этом случае, истечение времени окна. env.setStreamTimeCharacteristic (TimeCharacteristic.IngestionTime); env.setParallelism (1); @Jicaar Я ценю ваше мнение. - person Sharath; 19.10.2017
comment
Включение настраиваемого триггера отменяет триггер по умолчанию. Но то, что запускает пользовательский триггер, по-прежнему активно. Поэтому, когда окно TumblingEventTime длительностью 3000 мс завершается, оно запускает настраиваемый метод onEventTime в вашем настраиваемом триггере. Но у вас есть метод onEventTime, настроенный так, чтобы он просто продолжал, а не запускал и / или очищал (тогда как триггер по умолчанию возвращал бы FIRE_AND_PURGE), что делает временное окно события практически бессмысленным, насколько я могу судить. - person Jicaar; 19.10.2017
comment
Кроме того, я не вижу, как вы рассчитываете время события для событий, поэтому я предполагаю, что вы хотите вместо этого использовать время обработки? Но даже если это так, ему предлагается просто продолжить работу в имеющемся у вас настраиваемом триггере, чтобы ничего не произошло. Но если вы действительно хотите использовать eventTime, тогда метод onEventTime в вашем настраиваемом триггере должен возвращать TriggerResult.FIRE_AND_PURGE. Если вы хотите использовать время обработки, пусть ваш метод onProcessingTime в настраиваемом триггере возвращает TriggerResult.FIRE_AND_PURGE и измените TumblingEventTimeWindows.of () на TumblingProcessingTimeWindows.of () - person Jicaar; 19.10.2017
comment
Большое спасибо @Jicaar. Это помогло лучше понять это. - person Sharath; 30.10.2017