Как тестировать приложения Kafka Streams с помощью Spring Kafka?

Я пишу потоковое приложение с Kafka Streams, Spring-Kafka и Spring Boot. Я не могу найти никакой информации, как правильно протестировать потоковую обработку, выполняемую Kafka Streams DSL при использовании Spring-Kafka. В документации упоминается EmbeddedKafkaBroker, но, похоже, нет информации о том, как обрабатывать тестирование, например, хранилища состояний.

Просто чтобы представить простой пример того, что я хотел бы протестировать. У меня зарегистрирован следующий bean-компонент (где Item создается avro):


    @Bean
    public KTable<String, Long> itemTotalKTable(StreamsBuilder streamsBuilder) {
        return streamsBuilder
                .stream(ITEM_TOPIC,
                        Consumed.with(Serdes.String(), itemAvroSerde))
                .mapValues((id, item) -> item.getNumber())
                .groupByKey()
                .aggregate(
                        () -> 0L,
                        (id, number, agg) -> agg + number,
                        Materialized.with(Serdes.String(), Serdes.Long()));
    }

Как правильно проверить агрегирование всех номеров позиций?




Ответы (2)


Spring Kafka для поддержки Kafka Streams не приносит никаких дополнительных API, особенно при построении потоков и их обработке.

Недавно мы открыли для себя, что есть хорошая kafka-streams-test-utils библиотека, которую можно использовать в модульных тестах без запуска брокера Kafka (даже встроенного).

В нескольких наших тестах мы имеем что-то вроде этого:

    KStream<String, String> stream = builder.stream(INPUT);
    stream
            .transform(() -> enricher)
            .to(OUTPUT);

    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
    TopologyTestDriver driver = new TopologyTestDriver(builder.build(), config);

    ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(),
            new StringSerializer());
    driver.pipeInput(recordFactory.create(INPUT, "key", "value"));
    ProducerRecord<byte[], byte[]> result = driver.readOutput(OUTPUT);
    assertThat(result.headers().lastHeader("foo")).isNotNull();

Я считаю, что в этом TopologyTestDriver должен быть какой-то API для работы с упомянутым государственным хранилищем.

person Artem Bilan    schedule 03.09.2019
comment
Если кому-то нужно, я поделился репозиторием GitHub, в котором показано, как интегрировать Spring Kafka и тестовую библиотеку kafka-streams-test-utils: github.com/rcardin/spring-boot-kafka-stream-test - person riccardo.cardin; 22.05.2020

Возможно, вы могли бы создать метод, который принимает ваш KTable в качестве параметра и вызывает на нем .toStream().to(topicname,Produced.with(keyserde, valueserde)), тогда вы могли бы сделать следующее:

MyTopologyBuilder builder = new MyTopologyBuilder();

testDriver = new TopologyTestDriver(builder.build(), config);

ConsumerRecord<byte[], byte[]> input = createStepRecord(key, record);

testDriver.pipeInput(input);

ProducerRecord<String, String> out testDriver.readOutput(topic, new StringDeserializer(), ew AvroDeserializer<>(MyClass.class);

assertThat(out.key(), is(key));
assertEquals(myPredefinedValue, out.value());
assertEquals(5, out.value().getMyList().size());

Это должно сработать, но, я думаю, могут быть более элегантные способы.

person Any1    schedule 03.09.2019