Материализовать KStream в глобальное хранилище?

Я использую API Kafka Streams в приложении Java (Spring Cloud Stream). У меня есть конкретный пример использования, а именно:

  • Мое приложение будет потреблять из темы A и производить и потреблять из темы B.
  • Для каждого сообщения по теме A существует набор соответствующих сообщений, созданных для темы B, которые приложение использует для отслеживания изменений внутреннего состояния. Он потребляет из темы B с помощью KStream, чтобы материализовать это состояние в виде запрошенного хранилища.

Поскольку будет запущено несколько экземпляров приложения и нельзя гарантировать, какие именно разделы любой темы будут назначены экземплярам, ​​крайне важно, чтобы хранилище состояний было общим между приложениями. В противном случае, если для темы B произойдет перебалансировка, экземпляры приложения могут потерять информацию о состоянии, которую они отслеживают для сообщений по теме A. Рассмотрим следующий сценарий:

  • Экземпляр 1 имеет раздел 1 для темы A и раздел 1 для темы B.
  • Произойдет ребалансировка разделов для темы B.
  • Экземпляр 1 теперь имеет раздел 1 темы A (без изменений), но имеет раздел 2 темы B.
  • Экземпляр 1 теперь потерял доступ к данным в хранилище состояний, которое он создал, когда у него был раздел 1 для темы B.

Такая же ситуация возникает, если ребалансировка происходит только для темы А.

Возможно ли материализоваться в «глобальное хранилище состояний»? Я понимаю, что существует концепция GlobalKTable, но мне нужно использовать абстракцию KStream, поскольку мне нужен доступ ко всему потоку событий. Для справки, мой потребитель KStream выглядит следующим образом:

    @StreamListener(INPUT_TOPIC)
    public void consumeKStream(KStream<String, Pojo> kStream) {
        kStream.groupByKey(Serialized.with(keySerde, valueSerde)).aggregate(HashMap::new, (key, value, map) -> {
            map.put(value.getFoo(), value.getBar()); return map;
        }, Materialized.<String, Map<Foo, Bar>, KeyValueStore<Bytes, byte[]>>as(STATE_STORE_NAME)
                .withKeySerde(keySerde).withValueSerde(valueMapSerde));
    }

comment
Не могли бы вы объяснить немного подробнее, как вы хотите, чтобы ваше приложение работало? Я не уверен, понимаю ли я точную проблему. Разве вы не можете просто перенаправить данные по другой теме с помощью through?   -  person Jan Held    schedule 20.02.2020
comment
что вы имеете в виду под абстракцией KStream? У Spring нет собственной Ktable, AFAIK   -  person OneCricketeer    schedule 21.02.2020


Ответы (1)


Если вы читаете из темы A и из темы B, и у вас есть топология, которая материализует данные из темы B и выполняет поиск в хранилище для записи темы A, у вас будет гарантия, что экземпляр получит совместно секционированное назначение. Следовательно, описанного вами сценария никогда не произойдет.

Вы можете проверить это, проверив себя Topology (через describe()), который включает суб-топологии. Субтопологии выполняются, поскольку задачи и задачи имеют гарантированное совместное разделение входных тем.

Сравните: https://docs.confluent.io/current/streams/architecture.html#parallelism-model

person Matthias J. Sax    schedule 23.02.2020