У нас есть приложение Apache Flink POC, которое отлично работает локально, но после развертывания в Kinesis Data Analytics (KDA) оно не отправляет записи в приемник.
Используемые технологии
Местный
- Source: Kafka 2.7
- 1 broker
- 1 тема с разделением 1 и фактором репликации 1
- В обработке: Flink 1.12.1
- Приемник: Managed ElasticSearch Service 7.9.1 (тот же экземпляр, что и в случае с AWS)
AWS
- Source: Amazon MSK Kafka 2.8
- 3 brokers (but we are connecting to one)
- 1 тема с разделом 1, фактор репликации 3
- Processing: Amazon KDA Flink 1.11.1
- Parallelism: 2
- Параллельность на KPU: 2
- Приемник: управляемая служба ElasticSearch 7.9.1
Логика приложения
FlinkKafkaConsumer
читает сообщения в формате json из темы- Jsons сопоставляются с объектами домена, называемыми
Telemetry
private static DataStream<Telemetry> SetupKafkaSource(StreamExecutionEnvironment environment){
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "BROKER1_ADDRESS.amazonaws.com:9092");
kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_consumer");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("THE_TOPIC", new SimpleStringSchema(), kafkaProperties);
consumer.setStartFromEarliest(); //Just for repeatable testing
return environment
.addSource(consumer)
.map(new MapJsonToTelemetry());
}
- Отметка времени телеметрии выбрана для EventTimeStamp.
3.1. СforMonotonousTimeStamps
StateIso
телеметрии используется дляkeyBy
.
4.1. Двухбуквенный ISO-код штата США- Применяется 5-секундная стратегия переворачивания окна
private static SingleOutputStreamOperator<StateAggregatedTelemetry> SetupProcessing(DataStream<Telemetry> telemetries) {
WatermarkStrategy<Telemetry> wmStrategy =
WatermarkStrategy
.<Telemetry>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.TimeStamp);
return telemetries
.assignTimestampsAndWatermarks(wmStrategy)
.keyBy(t -> t.StateIso)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new WindowCountFunction());
}
- Пользовательский
ProcessWindowFunction
вызывается для выполнения некоторой базовой агрегации.
6.1. Рассчитываем одиночныйStateAggregatedTelemetry
- ElasticSearch настроен как приемник.
7.1.StateAggregatedTelemetry
данные отображаются вHashMap
и помещаются вsource
.
7.2. ВсеsetBulkFlushXYZ
методы имеют низкие значения
private static void SetupElasticSearchSink(SingleOutputStreamOperator<StateAggregatedTelemetry> telemetries) {
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(HttpHost.create("https://ELKCLUSTER_ADDRESS.amazonaws.com:443"));
ElasticsearchSink.Builder<StateAggregatedTelemetry> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
(ElasticsearchSinkFunction<StateAggregatedTelemetry>) (element, ctx, indexer) -> {
Map<String, Object> record = new HashMap<>();
record.put("stateIso", element.StateIso);
record.put("healthy", element.Flawless);
record.put("unhealthy", element.Faulty);
...
LOG.info("Telemetry has been added to the buffer");
indexer.add(Requests.indexRequest()
.index("INDEXPREFIX-"+ from.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")))
.source(record, XContentType.JSON));
}
);
//Using low values to make sure that the Flush will happen
esSinkBuilder.setBulkFlushMaxActions(25);
esSinkBuilder.setBulkFlushInterval(1000);
esSinkBuilder.setBulkFlushMaxSizeMb(1);
esSinkBuilder.setBulkFlushBackoff(true);
esSinkBuilder.setRestClientFactory(restClientBuilder -> {});
LOG.info("Sink has been attached to the DataStream");
telemetries.addSink(esSinkBuilder.build());
}
Исключенные вещи
- Нам удалось поместить Kafka, KDA и ElasticSearch в один и тот же VPC и одни и те же подсети, чтобы избежать необходимости подписывать каждый запрос.
- Из логов мы могли видеть, что Flink может достичь кластера ES.
Запрос
{
"locationInformation": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.verifyClientConnection(Elasticsearch7ApiCallBridge.java:135)",
"logger": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge",
"message": "Pinging Elasticsearch cluster via hosts [https://...es.amazonaws.com:443] ...",
"threadName": "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, WindowCountFunction) -> (Sink: Print to Std. Out, Sink: Unnamed, Sink: Print to Std. Out) (2/2)",
"applicationARN": "arn:aws:kinesisanalytics:...",
"applicationVersionId": "39",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
Ответ
{
"locationInformation": "org.elasticsearch.client.RequestLogger.logResponse(RequestLogger.java:59)",
"logger": "org.elasticsearch.client.RestClient",
"message": "request [HEAD https://...es.amazonaws.com:443/] returned [HTTP/1.1 200 OK]",
"threadName": "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, WindowCountFunction) -> (Sink: Print to Std. Out, Sink: Unnamed, Sink: Print to Std. Out) (2/2)",
"applicationARN": "arn:aws:kinesisanalytics:...",
"applicationVersionId": "39",
"messageSchemaVersion": "1",
"messageType": "DEBUG"
}
- Мы также могли убедиться, что сообщения были прочитаны из темы Kafka и отправлены на обработку, просмотрев панель управления Flink
Что мы пробовали безуспешно
- We had implemented a
RichParallelSourceFunction
which emits 1_000_000 messages and then exits- This worked well in the Local environment
- Работа завершена в среде AWS, но нет данных на стороне потребителя.
- We had implemented an other
RichParallelSourceFunction
which emits 100 messages at each second- Basically we had two loops a
while(true)
outer andfor
inner - После внутреннего цикла мы вызвали
Thread.sleep(1000)
- Это отлично работало в локальной среде
- Но в AWS мы могли видеть, что размер контрольных точек постоянно растет, и в ELK не появлялось никаких сообщений.
- Basically we had two loops a
- We have tried to run the KDA application with different parallelism settings
- But there was no difference
- We also tried to use different watermarking strategies (
forBoundedOutOfOrderness
,withIdle
,noWatermarks
)- But there was no difference
- We have added logs for the
ProcessWindowFunction
and for theElasticsearchSinkFunction
- Whenever we run the application from IDEA then these logs were on the console
- Whenever we run the application with KDA then there was no such logs in CloudWatch
- Those logs that were added to the
main
they do appear in the CloudWatch logs
- Those logs that were added to the
Мы предполагаем, что мы не видим данные на стороне приемника, потому что логика обработки окна не запускается. Вот почему не отображаются журналы обработки в CloudWatch.
Любая помощь будет более чем приветствоваться!
Обновление №1
- We have tried to downgrade the Flink version from 1.12.1 to 1.11.1
- There is no change
- We have tried processing time window instead of event time
- It did not even work on the local environment
Обновление №2
Средний размер сообщения составляет около 4 КБ. Вот отрывок из примера сообщения:
{
"affiliateCode": "...",
"appVersion": "1.1.14229",
"clientId": "guid",
"clientIpAddr": "...",
"clientOriginated": true,
"connectionType": "Cable/DSL",
"countryCode": "US",
"design": "...",
"device": "...",
...
"deviceSerialNumber": "...",
"dma": "UNKNOWN",
"eventSource": "...",
"firstRunTimestamp": 1609091112818,
"friendlyDeviceName": "Comcast",
"fullDevice": "Comcast ...",
"geoInfo": {
"continent": {
"code": "NA",
"geoname_id": 120
},
"country": {
"geoname_id": 123,
"iso_code": "US"
},
"location": {
"accuracy_radius": 100,
"latitude": 37.751,
"longitude": -97.822,
"time_zone": "America/Chicago"
},
"registered_country": {
"geoname_id": 123,
"iso_code": "US"
}
},
"height": 720,
"httpUserAgent": "Mozilla/...",
"isLoggedIn": true,
"launchCount": 19,
"model": "...",
"os": "Comcast...",
"osVersion": "...",
...
"platformTenantCode": "...",
"productCode": "...",
"requestOrigin": "https://....com",
"serverTimeUtc": 1617809474787,
"serviceCode": "...",
"serviceOriginated": false,
"sessionId": "guid",
"sessionSequence": 2,
"subtype": "...",
"tEventId": "...",
...
"tRegion": "us-east-1",
"timeZoneOffset": 5,
"timestamp": 1617809473305,
"traits": {
"isp": "Comcast Cable",
"organization": "..."
},
"type": "...",
"userId": "guid",
"version": "v1",
"width": 1280,
"xb3traceId": "guid"
}
Мы используем ObjectMapper
для анализа только некоторых полей файла json. Вот как выглядит класс Telemetry
:
public class Telemetry {
public String AppVersion;
public String CountryCode;
public String ClientId;
public String DeviceSerialNumber;
public String EventSource;
public String SessionId;
public TelemetrySubTypes SubType; //enum
public String TRegion;
public Long TimeStamp;
public TelemetryTypes Type; //enum
public String StateIso;
...
}
Обновление №3
Источник
Вкладка Подзадачи
ID | Bytes received | Records received | Bytes sent | Records sent | Status |
---|---|---|---|---|---|
0 | 0 B | 0 | 0 B | 0 | RUNNING |
1 | 0 B | 0 | 2.83 MB | 15,000 | RUNNING |
Вкладка водяные знаки
Нет данных
Окно
Вкладка Подзадачи
ID | Bytes received | Records received | Bytes sent | Records sent | Status |
---|---|---|---|---|---|
0 | 1.80 MB | 9,501 | 0 B | 0 | RUNNING |
1 | 1.04 MB | 5,499 | 0 B | 0 | RUNNING |
Водяные знаки
SubTask | Watermark |
---|---|
1 | No Watermark |
2 | No Watermark |