Окно не запускается, когда мы развернули приложение Flink в Kinesis Data Analytics.

У нас есть приложение Apache Flink POC, которое отлично работает локально, но после развертывания в Kinesis Data Analytics (KDA) оно не отправляет записи в приемник.

Используемые технологии

Местный

  • Source: Kafka 2.7
    • 1 broker
    • 1 тема с разделением 1 и фактором репликации 1
  • В обработке: Flink 1.12.1
  • Приемник: Managed ElasticSearch Service 7.9.1 (тот же экземпляр, что и в случае с AWS)

AWS

  • Source: Amazon MSK Kafka 2.8
    • 3 brokers (but we are connecting to one)
    • 1 тема с разделом 1, фактор репликации 3
  • Processing: Amazon KDA Flink 1.11.1
    • Parallelism: 2
    • Параллельность на KPU: 2
  • Приемник: управляемая служба ElasticSearch 7.9.1

Логика приложения

  1. FlinkKafkaConsumer читает сообщения в формате json из темы
  2. Jsons сопоставляются с объектами домена, называемыми Telemetry
private static DataStream<Telemetry> SetupKafkaSource(StreamExecutionEnvironment environment){
    Properties kafkaProperties = new Properties();
    kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "BROKER1_ADDRESS.amazonaws.com:9092");
    kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_consumer");

    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("THE_TOPIC", new SimpleStringSchema(), kafkaProperties);

    consumer.setStartFromEarliest(); //Just for repeatable testing

    return environment
            .addSource(consumer)
            .map(new MapJsonToTelemetry());
}
  1. Отметка времени телеметрии выбрана для EventTimeStamp.
    3.1. С forMonotonousTimeStamps
  2. StateIso телеметрии используется для keyBy.
    4.1. Двухбуквенный ISO-код штата США
  3. Применяется 5-секундная стратегия переворачивания окна
private static SingleOutputStreamOperator<StateAggregatedTelemetry> SetupProcessing(DataStream<Telemetry> telemetries) {
    WatermarkStrategy<Telemetry> wmStrategy =
            WatermarkStrategy
                    .<Telemetry>forMonotonousTimestamps()
                    .withTimestampAssigner((event, timestamp) -> event.TimeStamp);

    return telemetries
            .assignTimestampsAndWatermarks(wmStrategy)
            .keyBy(t -> t.StateIso)
            .window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .process(new WindowCountFunction());
}
  1. Пользовательский ProcessWindowFunction вызывается для выполнения некоторой базовой агрегации.
    6.1. Рассчитываем одиночный StateAggregatedTelemetry
  2. ElasticSearch настроен как приемник.
    7.1. StateAggregatedTelemetry данные отображаются в HashMap и помещаются в source.
    7.2. Все setBulkFlushXYZ методы имеют низкие значения
private static void SetupElasticSearchSink(SingleOutputStreamOperator<StateAggregatedTelemetry> telemetries) {
    List<HttpHost> httpHosts = new ArrayList<>();
    httpHosts.add(HttpHost.create("https://ELKCLUSTER_ADDRESS.amazonaws.com:443"));

    ElasticsearchSink.Builder<StateAggregatedTelemetry> esSinkBuilder = new ElasticsearchSink.Builder<>(
            httpHosts,
            (ElasticsearchSinkFunction<StateAggregatedTelemetry>) (element, ctx, indexer) -> {
                Map<String, Object> record = new HashMap<>();

                record.put("stateIso", element.StateIso);
                record.put("healthy", element.Flawless);
                record.put("unhealthy", element.Faulty);
                ...

                LOG.info("Telemetry has been added to the buffer");
                indexer.add(Requests.indexRequest()
                        .index("INDEXPREFIX-"+ from.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")))
                        .source(record, XContentType.JSON));
            }
    );

    //Using low values to make sure that the Flush will happen
    esSinkBuilder.setBulkFlushMaxActions(25);
    esSinkBuilder.setBulkFlushInterval(1000);
    esSinkBuilder.setBulkFlushMaxSizeMb(1);
    esSinkBuilder.setBulkFlushBackoff(true);
    esSinkBuilder.setRestClientFactory(restClientBuilder -> {});

    LOG.info("Sink has been attached to the DataStream");
    telemetries.addSink(esSinkBuilder.build());
}

Исключенные вещи

  • Нам удалось поместить Kafka, KDA и ElasticSearch в один и тот же VPC и одни и те же подсети, чтобы избежать необходимости подписывать каждый запрос.
  • Из логов мы могли видеть, что Flink может достичь кластера ES.
    Запрос
{
    "locationInformation": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.verifyClientConnection(Elasticsearch7ApiCallBridge.java:135)",
    "logger": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge",
    "message": "Pinging Elasticsearch cluster via hosts [https://...es.amazonaws.com:443] ...",
    "threadName": "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, WindowCountFunction) -> (Sink: Print to Std. Out, Sink: Unnamed, Sink: Print to Std. Out) (2/2)",
    "applicationARN": "arn:aws:kinesisanalytics:...",
    "applicationVersionId": "39",
    "messageSchemaVersion": "1",
    "messageType": "INFO"
}

Ответ

{
    "locationInformation": "org.elasticsearch.client.RequestLogger.logResponse(RequestLogger.java:59)",
    "logger": "org.elasticsearch.client.RestClient",
    "message": "request [HEAD https://...es.amazonaws.com:443/] returned [HTTP/1.1 200 OK]",
    "threadName": "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, WindowCountFunction) -> (Sink: Print to Std. Out, Sink: Unnamed, Sink: Print to Std. Out) (2/2)",
    "applicationARN": "arn:aws:kinesisanalytics:...",
    "applicationVersionId": "39",
    "messageSchemaVersion": "1",
    "messageType": "DEBUG"
}
  • Мы также могли убедиться, что сообщения были прочитаны из темы Kafka и отправлены на обработку, просмотрев панель управления Flink  Данные отправляются и принимаются между задачами

Что мы пробовали безуспешно

  • We had implemented a RichParallelSourceFunction which emits 1_000_000 messages and then exits
    • This worked well in the Local environment
    • Работа завершена в среде AWS, но нет данных на стороне потребителя.
  • We had implemented an other RichParallelSourceFunction which emits 100 messages at each second
    • Basically we had two loops a while(true) outer and for inner
    • После внутреннего цикла мы вызвали Thread.sleep(1000)
    • Это отлично работало в локальной среде
    • Но в AWS мы могли видеть, что размер контрольных точек постоянно растет, и в ELK не появлялось никаких сообщений.
  • We have tried to run the KDA application with different parallelism settings
    • But there was no difference
  • We also tried to use different watermarking strategies (forBoundedOutOfOrderness, withIdle, noWatermarks)
    • But there was no difference
  • We have added logs for the ProcessWindowFunction and for the ElasticsearchSinkFunction
    • Whenever we run the application from IDEA then these logs were on the console
    • Whenever we run the application with KDA then there was no such logs in CloudWatch
      • Those logs that were added to the main they do appear in the CloudWatch logs

Мы предполагаем, что мы не видим данные на стороне приемника, потому что логика обработки окна не запускается. Вот почему не отображаются журналы обработки в CloudWatch.

Любая помощь будет более чем приветствоваться!


Обновление №1

  • We have tried to downgrade the Flink version from 1.12.1 to 1.11.1
    • There is no change
  • We have tried processing time window instead of event time
    • It did not even work on the local environment

Обновление №2

Средний размер сообщения составляет около 4 КБ. Вот отрывок из примера сообщения:

{
  "affiliateCode": "...",
  "appVersion": "1.1.14229",
  "clientId": "guid",
  "clientIpAddr": "...",
  "clientOriginated": true,
  "connectionType": "Cable/DSL",
  "countryCode": "US",
  "design": "...",
  "device": "...",
  ...
  "deviceSerialNumber": "...",
  "dma": "UNKNOWN",
  "eventSource": "...",
  "firstRunTimestamp": 1609091112818,
  "friendlyDeviceName": "Comcast",
  "fullDevice": "Comcast ...",
  "geoInfo": {
    "continent": {
      "code": "NA",
      "geoname_id": 120
    },
    "country": {
      "geoname_id": 123,
      "iso_code": "US"
    },
    "location": {
      "accuracy_radius": 100,
      "latitude": 37.751,
      "longitude": -97.822,
      "time_zone": "America/Chicago"
    },
    "registered_country": {
      "geoname_id": 123,
      "iso_code": "US"
    }
  },
  "height": 720,
  "httpUserAgent": "Mozilla/...",
  "isLoggedIn": true,
  "launchCount": 19,
  "model": "...",
  "os": "Comcast...",
  "osVersion": "...",
  ...
  "platformTenantCode": "...",
  "productCode": "...",
  "requestOrigin": "https://....com",
  "serverTimeUtc": 1617809474787,
  "serviceCode": "...",
  "serviceOriginated": false,
  "sessionId": "guid",
  "sessionSequence": 2,
  "subtype": "...",
  "tEventId": "...",
  ...
  "tRegion": "us-east-1",
  "timeZoneOffset": 5,
  "timestamp": 1617809473305,
  "traits": {
    "isp": "Comcast Cable",
    "organization": "..."
  },
  "type": "...",
  "userId": "guid",
  "version": "v1",
  "width": 1280,
  "xb3traceId": "guid"
}

Мы используем ObjectMapper для анализа только некоторых полей файла json. Вот как выглядит класс Telemetry:

public class Telemetry {
    public String AppVersion;
    public String CountryCode;
    public String ClientId;
    public String DeviceSerialNumber;
    public String EventSource;
    public String SessionId;
    public TelemetrySubTypes SubType; //enum
    public String TRegion;
    public Long TimeStamp;
    public TelemetryTypes Type; //enum
    public String StateIso;
    
    ...
}

Обновление №3

Источник

Вкладка Подзадачи

ID Bytes received Records received Bytes sent Records sent Status
0 0 B 0 0 B 0 RUNNING
1 0 B 0 2.83 MB 15,000 RUNNING

Вкладка водяные знаки

Нет данных

Окно

Вкладка Подзадачи

ID Bytes received Records received Bytes sent Records sent Status
0 1.80 MB 9,501 0 B 0 RUNNING
1 1.04 MB 5,499 0 B 0 RUNNING

Водяные знаки

SubTask Watermark
1 No Watermark
2 No Watermark

person Peter Csala    schedule 14.05.2021    source источник
comment
Можете ли вы отбросить лишь образец данных, которые вы читаете из Kafka?   -  person Dominik Wosiński    schedule 18.05.2021
comment
@ DominikWosiński Я расширил свой вопрос. Спасибо за ваши усилия.   -  person Peter Csala    schedule 18.05.2021
comment
Еще несколько вопросов, проверили ли вы водяные знаки, сгенерированные параллельными экземплярами первых операторов? Вы также проверяли, сколько данных отправил каждый экземпляр оператора? (оба видны в пользовательском интерфейсе)   -  person Dominik Wosiński    schedule 18.05.2021
comment
@ DominikWosiński Я расширил свой вопрос соответствующей информацией. Мне кажется довольно странным, что нет водяного знака.   -  person Peter Csala    schedule 18.05.2021
comment
Да, поэтому моя первая идея заключается в том, что это связано с неправильным отношением параллелизм / разделение. Вы сказали, что тестировали его с другим параллелизмом, но не уверены, пробовали ли вы параллелизм 1. Сообщите мне, поможет ли это.   -  person Dominik Wosiński    schedule 18.05.2021
comment
@ DominikWosiński Оказалось, что мы допустили серьезную ошибку, забыли установить временные характеристики. Я разместил сообщение, в котором подробно описал проблему, если вам интересно.   -  person Peter Csala    schedule 19.05.2021


Ответы (2)


Согласно комментариям и дополнительной информации, которую вы предоставили, похоже, что проблема заключается в том, что два потребителя Flink не могут использовать один и тот же раздел. Итак, в вашем случае только один параллельный экземпляр оператора будет потреблять из раздела kafka, а другой будет бездействовать.

Обычно оператор Flink выбирает MIN([all_downstream_parallel_watermarks]), поэтому в вашем случае один потребитель Kafka будет создавать обычные водяные знаки, а другой никогда ничего не будет создавать (в этом случае flink предполагает Long.Min), поэтому Flink выберет нижний, который равен Long.Min. Таким образом, окно никогда не будет запущено, потому что во время передачи данных один из водяных знаков никогда не создается. Хорошая практика - использовать тот же параллелизм, что и количество разделов Kafka при работе с Kafka.

person Dominik Wosiński    schedule 18.05.2021
comment
Я изменил параллелизм на 1, как вы предложили (так что есть только одна подзадача). На вкладке "Водяные знаки" исходного оператора по-прежнему указано Нет данных. На вкладке "Водяные знаки" оператора окна по-прежнему указано Нет водяного знака. - person Peter Csala; 18.05.2021

После сеанса поддержки с людьми из AWS выяснилось, что мы пропустили установку временной характеристики в потоковой среде.

  • В 1.11.1 значение по умолчанию TimeCharacteristic было IngestionTime.
  • Начиная с 1.12.1 (см. Связанный примечания к выпуску) значение по умолчанию - EventTime:

В Flink 1.12 временная характеристика потока по умолчанию была изменена на EventTime, поэтому вам больше не нужно вызывать этот метод для включения поддержки времени события.

Итак, после того, как мы явно установили EventTime, он начал генерировать водяные знаки, как шарм:

streamingEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
person Peter Csala    schedule 19.05.2021