Я использую API Kafka Streams в приложении Java (Spring Cloud Stream). У меня есть конкретный пример использования, а именно:
- Мое приложение будет потреблять из темы A и производить и потреблять из темы B.
- Для каждого сообщения по теме A существует набор соответствующих сообщений, созданных для темы B, которые приложение использует для отслеживания изменений внутреннего состояния. Он потребляет из темы B с помощью KStream, чтобы материализовать это состояние в виде запрошенного хранилища.
Поскольку будет запущено несколько экземпляров приложения и нельзя гарантировать, какие именно разделы любой темы будут назначены экземплярам, крайне важно, чтобы хранилище состояний было общим между приложениями. В противном случае, если для темы B произойдет перебалансировка, экземпляры приложения могут потерять информацию о состоянии, которую они отслеживают для сообщений по теме A. Рассмотрим следующий сценарий:
- Экземпляр 1 имеет раздел 1 для темы A и раздел 1 для темы B.
- Произойдет ребалансировка разделов для темы B.
- Экземпляр 1 теперь имеет раздел 1 темы A (без изменений), но имеет раздел 2 темы B.
- Экземпляр 1 теперь потерял доступ к данным в хранилище состояний, которое он создал, когда у него был раздел 1 для темы B.
Такая же ситуация возникает, если ребалансировка происходит только для темы А.
Возможно ли материализоваться в «глобальное хранилище состояний»? Я понимаю, что существует концепция GlobalKTable, но мне нужно использовать абстракцию KStream, поскольку мне нужен доступ ко всему потоку событий. Для справки, мой потребитель KStream выглядит следующим образом:
@StreamListener(INPUT_TOPIC)
public void consumeKStream(KStream<String, Pojo> kStream) {
kStream.groupByKey(Serialized.with(keySerde, valueSerde)).aggregate(HashMap::new, (key, value, map) -> {
map.put(value.getFoo(), value.getBar()); return map;
}, Materialized.<String, Map<Foo, Bar>, KeyValueStore<Bytes, byte[]>>as(STATE_STORE_NAME)
.withKeySerde(keySerde).withValueSerde(valueMapSerde));
}
through
? - person Jan Held   schedule 20.02.2020