Агрегация транзакций в Apache Flink

Я пытался понять, как написать программу flink, которая получает события из 3 тем кафки, суммирует их на сегодня, вчера и позавчера.

поэтому первый вопрос: как я могу суммировать транзакцию за 3 разных дня и извлечь их в виде файла json


person TheEliteOne    schedule 06.02.2018    source источник
comment
Ваш вопрос несколько неясен. Когда вы говорите 3 потока кафки, вы имеете в виду библиотеку Kafka Streams или вы имели в виду 3 темы кафки? В любом случае, я предлагаю вам начать с документов ci.apache.org/projects/flink/flink-docs-release-1.4/dev/ или несколько примеров training.data-artisans.com/exercises/toFromKafka.html.   -  person David Anderson    schedule 06.02.2018
comment
@alpinegizmo спасибо за ответ, я изменил вопрос   -  person TheEliteOne    schedule 06.02.2018
comment
как вы хотите сгруппировать свои транзакции? Если у вас есть следующие идентификаторы транзакций: 1,2,3,4,5,6, будут ли группы 1,2,3 и 4,5,6 или группы должны быть скользящими группами 1,2,3 - 2,3 ,4 - 3,4,5 - .....?   -  person diegoreico    schedule 07.02.2018
comment
@DiegoReirizCores хорошо, я имел в виду группировку, группировку всех транзакций дня и возврат их числа в виде json   -  person TheEliteOne    schedule 07.02.2018
comment
о, тогда в моих примерах размер окна должен быть изменен на 1, чтобы сгруппировать все сообщения за день, а не все сообщения за последние 3 дня.   -  person diegoreico    schedule 07.02.2018


Ответы (1)


Если вы хотите читать из 3 разных тем или разделов кафки, вам нужно создать 3 источника кафки.

документация Flink о потребителе kafka

val env = StreamExecutionEnvironment.getExecutionEnvironment()
val consumer0 = new FlinkKafkaConsumer08[String](...)
val consumer1 = new FlinkKafkaConsumer08[String](...)
val consumer2 = new FlinkKafkaConsumer08[String](...)
consumer0.setStartFromGroupOffsets()
consumer1.setStartFromGroupOffsets()
consumer2.setStartFromGroupOffsets()

val stream0 = env.addSource(consumer0)
val stream1 = env.addSource(consumer1)
val stream2 = env.addSource(consumer2)

val unitedStream = stream0.union(stream1,stream2)

/* Logic to group transactions from 3 days */
/* I need more info, but it should be a Sliding or Fixed windows Keyed by the id of the transactions*/

val windowSize = 1 // number of days that the window use to group events
val windowStep = 1 // window slides 1 day

val reducedStream = unitedStream
    .keyBy("transactionId") // or any field that groups transactions in the same group
    .timeWindow(Time.days(windowSize),Time.days(windowStep))
    .map(transaction => {
        transaction.numberOfTransactions = 1
        transaction
    }).sum("numberOfTransactions");

val streamFormatedAsJson = reducedStream.map(functionToParseDataAsJson) 
// you can use a library like GSON for this
// or a scala string template

streamFormatedAsJson.sink(yourFavoriteSinkToWriteYourData)

Если имена ваших тем можно сопоставить с регулярным выражением, вы можете создать только одного потребителя kafka следующим образом:

val env = StreamExecutionEnvironment.getExecutionEnvironment()

val consumer = new FlinkKafkaConsumer08[String](
  java.util.regex.Pattern.compile("day-[1-3]"),
  ..., //check documentation to know how to fill this field
  ...) //check documentation to know how to fill this field

val stream = env.addSource(consumer)

Наиболее распространенным подходом является размещение всех транзакций в одной и той же теме kafka, а не в разных темах, в этом случае код будет более простым, потому что вам нужно использовать только окно для обработки ваших данных.

Day 1 -> 11111 -\
Day 2 -> 22222 --> 1111122222333 -> Window -> 11111 22222 333 -> reduce operation per window partition
Day 3 -> 3333 --/                            |-----|-----|---|

Пример кода

val env = StreamExecutionEnvironment.getExecutionEnvironment()
val consumer = new FlinkKafkaConsumer08[String](...)
consumer.setStartFromGroupOffsets()

val stream = env.addSource(consumer)

/* Logic to group transactions from 3 days */
/* I need more info, but it should be a Sliding or Fixed windows Keyed by the id of the transactions*/

val windowSize = 1 // number of days that the window use to group events
val windowStep = 1 // window slides 1 day

val reducedStream = stream
    .keyBy("transactionId") // or any field that groups transactions in the same group
    .timeWindow(Time.days(windowSize),Time.days(windowStep))
    .map(transaction => {
        transaction.numberOfTransactions = 1
        transaction
    }).sum("numberOfTransactions");

val streamFormatedAsJson = reducedStream.map(functionToParseDataAsJson) 
// you can use a library like GSON for this
// or a scala string template

streamFormatedAsJson.sink(yourFavoriteSinkToWriteYourData)
person diegoreico    schedule 07.02.2018
comment
Спаси Господи за помощь! - person TheEliteOne; 07.02.2018
comment
Нет проблем;) просто опубликуйте еще один комментарий, если вам нужна дополнительная помощь, и отметьте вопрос как решенный, если это работает для вас, чтобы другие люди, которые отвечают на вопросы, могли знать, что это уже решено, и сосредоточиться на помощи другим людям. - person diegoreico; 07.02.2018
comment
я дам ссылку на мой github, чтобы вы могли проверить, что я сделал :) если вы не против ofc :D кстати, я действительно хочу изучить flink/kafka знаете ли вы какие-нибудь интернет-курсы? - person TheEliteOne; 07.02.2018
comment
Конечно! я проверю ;) и у людей из мастеров обработки данных есть несколько хороших примеров, поэтому я Я изучаю Флинка и Кафку, каждый раз пробую разные вещи, когда что-то делаю. - person diegoreico; 07.02.2018
comment
это ссылка github.com/anvarknian/flinkquickstartscala проверьте ее и скажите, все ли в порядке там :D - person TheEliteOne; 07.02.2018
comment
@TheEliteOne, это была моя ошибка. Те groupBy должны быть keyBy. Я уже изменил пример - person diegoreico; 07.02.2018