Apache Flink: как подсчитать общее количество событий в DataStream

У меня есть два необработанных потока, и я присоединяюсь к этим потокам, а затем я хочу подсчитать, какое общее количество событий было присоединено, а сколько событий нет. Я делаю это, используя карту на joinedEventDataStream, как показано ниже.

joinedEventDataStream.map(new RichMapFunction<JoinedEvent, Object>() {

            @Override
            public Object map(JoinedEvent joinedEvent) throws Exception {

                number_of_joined_events += 1;

                return null;
            }
        });

Вопрос №1: Это подходящий способ подсчета количества событий в потоке?

Вопрос № 2: Я заметил зашитое поведение, которому некоторые из вас могут не поверить. Проблема в том, что когда я запускаю свою программу Flink в IntelliJ IDE, она показывает мне правильное значение для number_of_joined_events, но 0 в том случае, когда я отправляю эту программу как jar. Таким образом, я получаю начальное значение number_of_joined_events, когда запускаю программу как jar файл вместо фактического количества. Почему это происходит только в случае отправки jar файла, а не в IDE?


person Amarjit Dhillon    schedule 12.11.2017    source источник


Ответы (1)


Ваш подход не работает. Ожидается поведение, которое вы заметили при выполнении программы через файл JAR.

Я не знаю, как определяется number_of_joined_events, но я предполагаю, что это статическая переменная в вашей программе. Когда вы запускаете программу в своей среде IDE, она запускается в одной JVM. Следовательно, все операторы имеют доступ к статической переменной. Когда вы отправляете файл JAR удаленному процессу, программа выполняется в другой JVM (возможно, на нескольких JVM), и статическая переменная в вашем клиентском процессе никогда не обновляется.

Вы можете использовать метрики Flink или ReduceFunction, который суммирует 1s, чтобы подсчитать количество обработанных записей.

person Fabian Hueske    schedule 13.11.2017
comment
вау, ты гений. это статическая переменная, я использовал аккумулятор в connectedDataStrem, и он работал в файле jar. хотя это заняло у меня выходные :). Благодаря тонну - person Amarjit Dhillon; 13.11.2017