События сообщений, связанные с Kafka KStream, в скользящем окне

У нас есть ситуация, в которой я думаю, что Kafka Streams может помочь, но я не могу найти никакой документации или примеров, показывающих, как это сделать.

Я нашел один похожий вопрос, но в нем нет рекомендаций по реализации: Функция ожидания Kafka Streams с зависимыми объектами

Что я хочу сделать:

Я хотел бы сопоставить связанные записи из темы Kafka в один объект и опубликовать этот новый объект в отдельной выходной теме. Например, может быть пять записей сообщений, связанных друг с другом уникальным ключом — я хочу создать новый объект из этих связанных объектов и создать его для новой темы.

Я хочу, чтобы все связанные события в течение скользящего окна в один час были объединены. Другими словами, как только сообщение A с идентификатором «123» поступает к потребителю, приложение должно ждать не менее одного часа, пока не поступят остальные записи с идентификатором «123». После того, как все записи поступили или прошел один час, срок действия этих записей истек.

Наконец, все связанные сообщения, собранные за час, используются для создания нового объекта, который затем отправляется в другую тему Kafka.

Проблемы, с которыми я столкнулся.

Скользящее окно в Kafka работает только при объединении двух потоков. У нас будет только один поток, подключенный к теме — я не знаю, зачем нужны два потока или как мы будем это реализовывать. Я не могу найти никаких примеров этого в Интернете. Все потоковые функции, которые я видел в Kafka, просто агрегируют/сводят к простому значению при сборе событий одного и того же ключа. Например, сколько раз появляется клавиша или суммируется какое-то значение

Вот некоторый псевдокод для описания того, о чем я говорю. Имена/семантика функций будут другими, если функциональность существует.

    KStream<Key, Object> kstream = kStreamBuilder.stream(TOPIC);
    kstream.windowedBy(
    // One hour sliding Window
    )
    .collectAllRelatedKeys(
    // Collect all Records related to each key
    // map == HashMap<Key, ArrayList<Value>>
       map.get(key).add(value);
    )
    .transformAndProcess(
        if(ALL_EVENTS_COLLECTED) {
        // Create new Object from all related records
            newObject = 
            createNewObjectFromRelatedRecordsFunction(map.get(key));
            producer.send(newObject);   
        }
    )

Вопросы (и спасибо за помощь):

  1. Как я могу использовать скользящие окна с одним потоком?
  2. Как настроить функции KStream/KTable для сбора всех связанных событий во временном окне и создания нового объекта для другой темы?
  3. Как управление подтверждением/смещением работает с потоками скользящего окна?
  4. Может ли это гарантировать доставку «Ровно один раз»? Для справки: https://www.confluent.io/blog/enabling-exactly-kafka-streams/

person mikemikemike    schedule 05.03.2018    source источник
comment
Может ли это гарантировать доставку Exactly Once? - такой вещи просто не существует, ничто не может ее гарантировать. Вы всегда должны стремиться к сценарию доставки хотя бы один раз + идемпотентному действию.   -  person Etki    schedule 06.03.2018
comment
Это гарантия, о которой я говорю. Добавлю к вопросу, спасибо! confluent.io /блог/   -  person mikemikemike    schedule 06.03.2018
comment
Вы не можете использовать DSL для своего подхода, но вам необходимо реализовать логику работы с окнами вручную, используя шаг transform() с прикрепленным состоянием.   -  person Matthias J. Sax    schedule 06.03.2018
comment
@mikemikemike, в конце концов, каким было твое решение? Не могли бы вы опубликовать ответ или обновить свой вопрос?   -  person mathiasfk    schedule 21.05.2019