Я пишу потоковое приложение с Kafka Streams, Spring-Kafka и Spring Boot. Я не могу найти никакой информации, как правильно протестировать потоковую обработку, выполняемую Kafka Streams DSL при использовании Spring-Kafka. В документации упоминается EmbeddedKafkaBroker, но, похоже, нет информации о том, как обрабатывать тестирование, например, хранилища состояний.
Просто чтобы представить простой пример того, что я хотел бы протестировать. У меня зарегистрирован следующий bean-компонент (где Item создается avro):
@Bean
public KTable<String, Long> itemTotalKTable(StreamsBuilder streamsBuilder) {
return streamsBuilder
.stream(ITEM_TOPIC,
Consumed.with(Serdes.String(), itemAvroSerde))
.mapValues((id, item) -> item.getNumber())
.groupByKey()
.aggregate(
() -> 0L,
(id, number, agg) -> agg + number,
Materialized.with(Serdes.String(), Serdes.Long()));
}
Как правильно проверить агрегирование всех номеров позиций?