У нас есть ситуация, в которой я думаю, что Kafka Streams может помочь, но я не могу найти никакой документации или примеров, показывающих, как это сделать.
Я нашел один похожий вопрос, но в нем нет рекомендаций по реализации: Функция ожидания Kafka Streams с зависимыми объектами
Что я хочу сделать:
Я хотел бы сопоставить связанные записи из темы Kafka в один объект и опубликовать этот новый объект в отдельной выходной теме. Например, может быть пять записей сообщений, связанных друг с другом уникальным ключом — я хочу создать новый объект из этих связанных объектов и создать его для новой темы.
Я хочу, чтобы все связанные события в течение скользящего окна в один час были объединены. Другими словами, как только сообщение A с идентификатором «123» поступает к потребителю, приложение должно ждать не менее одного часа, пока не поступят остальные записи с идентификатором «123». После того, как все записи поступили или прошел один час, срок действия этих записей истек.
Наконец, все связанные сообщения, собранные за час, используются для создания нового объекта, который затем отправляется в другую тему Kafka.
Проблемы, с которыми я столкнулся.
Скользящее окно в Kafka работает только при объединении двух потоков. У нас будет только один поток, подключенный к теме — я не знаю, зачем нужны два потока или как мы будем это реализовывать. Я не могу найти никаких примеров этого в Интернете. Все потоковые функции, которые я видел в Kafka, просто агрегируют/сводят к простому значению при сборе событий одного и того же ключа. Например, сколько раз появляется клавиша или суммируется какое-то значение
Вот некоторый псевдокод для описания того, о чем я говорю. Имена/семантика функций будут другими, если функциональность существует.
KStream<Key, Object> kstream = kStreamBuilder.stream(TOPIC);
kstream.windowedBy(
// One hour sliding Window
)
.collectAllRelatedKeys(
// Collect all Records related to each key
// map == HashMap<Key, ArrayList<Value>>
map.get(key).add(value);
)
.transformAndProcess(
if(ALL_EVENTS_COLLECTED) {
// Create new Object from all related records
newObject =
createNewObjectFromRelatedRecordsFunction(map.get(key));
producer.send(newObject);
}
)
Вопросы (и спасибо за помощь):
- Как я могу использовать скользящие окна с одним потоком?
- Как настроить функции KStream/KTable для сбора всех связанных событий во временном окне и создания нового объекта для другой темы?
- Как управление подтверждением/смещением работает с потоками скользящего окна?
- Может ли это гарантировать доставку «Ровно один раз»? Для справки: https://www.confluent.io/blog/enabling-exactly-kafka-streams/
transform()
с прикрепленным состоянием. - person Matthias J. Sax   schedule 06.03.2018