Spring-kafka-test тестирование JSON-сообщения с настраиваемой десериализацией

Я в основном успешно настраивал тест с использованием EmbeddedKafka, который создает и использует одно сообщение. Ниже представлена ​​рабочая версия теста:

@RunWith(SpringRunner.class)
@SpringBootTest
@EmbeddedKafka
public class KafkaFlowTest {

    @Autowired
    EmbeddedKafkaBroker broker;

    public static final String EMAIL_TOPIC = "email-service";

    static {
        System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
    }

    private Consumer<String, KafkaEmailMessageWrapper> consumer;
    private Producer<String, KafkaEmailMessageWrapper> producer;

    @Before
    public void setUp() {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group", "true", broker);
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        DefaultKafkaConsumerFactory<String, KafkaEmailMessageWrapper> cf =
                new DefaultKafkaConsumerFactory<>(
                        consumerProps,
                        new StringDeserializer(),
                        new JsonDeserializer<>(KafkaEmailMessageWrapper.class)
                );
        consumer = cf.createConsumer();
        consumer.subscribe(Collections.singleton(EMAIL_TOPIC));
        consumer.poll(Duration.ZERO);

        DefaultKafkaProducerFactory<String, KafkaEmailMessageWrapper> pf =
                new DefaultKafkaProducerFactory<>(producerProps);
        producer = pf.createProducer();
    }

    @Test
    public void testNoErrorMessageFlow() {
        KafkaEmailMessageWrapper wrapperMessage = KafkaEmailMessageWrapper.builder()
                .emailRequest(
                    EmailMessage.builder()
                        .body("body")
                        .from("[email protected]")
                        .to(new String[]{"[email protected]"})
                        .subject("hey!")
                        .build()
                )
                .build();

        producer.send(new ProducerRecord<>(EMAIL_TOPIC, "whatever", wrapperMessage));
        producer.flush();

        ConsumerRecord<String, KafkaEmailMessageWrapper> record = KafkaTestUtils.getSingleRecord(consumer, EMAIL_TOPIC);
        KafkaEmailMessageWrapper receivedMessage = record.value();
        assertEquals(wrapperMessage.getEmailRequest().getSubject(), receivedMessage.getEmailRequest().getSubject());
    }

}

Это работает нормально, если я не добавлю assertEquals в поле to, которое в JSON представляет собой список адресов электронной почты, разделенных ;, а в классе - массив строк. Итак, в классе EmailMessage у меня есть поле to, помеченное знаком @JsonDeserializer, указывающим на настраиваемый десериализатор, который разделяется на ; и т. Д. Все работает, когда приложение запущено, только тесты прерываются.

Я попытался изменить приведенный выше код, чтобы изменить Producer<String, KafkaEmailMessageWrapper> producer на private Producer<String, String> producer, а затем в тесте отправить JSON вместо экземпляра KafkaEmailMessageWrapper, но затем я получил исключение:

java.lang.ClassCastException: java.lang.String cannot be cast to com.test.model.KafkaEmailMessageWrapper

Таким образом, похоже, что десериализация из строки JSON в модель по какой-то причине не происходит в этом тестовом сценарии. Я хотел бы, чтобы тест был максимально приближен к реальному варианту использования, поэтому он должен выдавать строковое сообщение, а затем выполнять десериализацию для моего класса модели. Не уверен, почему этого не происходит, любая помощь в понимании того, почему это, будет оценена!

Для завершения это определение прослушивателя сообщений:

@KafkaListener(topics = "email-service")
public void receive(@Payload KafkaEmailMessageWrapper message)

ИЗМЕНИТЬ

Позвольте мне перефразировать проблему, потому что, возможно, я немного запутался. Обзор приложения:

  • использовать сообщение JSON
  • сериализуйте его в KafkaEmailMessageWrapper
  • послать электронное письмо

Это работает, я отправляю JSON в Kafka, он получен (определение слушателя выше) и работает правильно. Единственная проблема, с которой я столкнулся, это во время тестирования.

Я хочу воспроизвести ту же ситуацию в тесте, но когда я сделаю это:

kafkaTemplate.sendDefault("key", "<json message>");

Я получаю:

Cannot handle message; nested exception is  org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.test.model.KafkaEmailMessageWrapper] for GenericMessage

Однако когда я вместо этого отправляю экземпляр KafkaEmailMessageWrapper:

kafkaTemplate.sendDefault("key", kafkaEmailMessageWrapper);

он работает, но тогда он не использует мои @JsonDeserialize аннотации, которые я указал для класса:

@JsonDeserialize(using = SemicolonArrayDeserializer.class)
private String[] to;
@JsonDeserialize(using = SemicolonArrayDeserializer.class)
private String[] cc;

Таким образом, при отправке экземпляра поля to и cc всегда пусты, потому что в этих полях уже есть массивы строк, поэтому логика десериализации разделения по ; ничего не делает. Это метод десериализации:

public String[] deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
    ObjectMapper mapper = (ObjectMapper) p.getCodec();
    JsonNode node = mapper.readTree(p);
    return node.asText().split(";");
}

person michauwilliam    schedule 03.06.2019    source источник
comment
Он просто получает String subject, который представляет тему письма, я думаю, здесь это не совсем актуально   -  person michauwilliam    schedule 04.06.2019
comment
Извините, я имел в виду точно описать, что это означает unless I add assertEquals on the to field - что вы получите?   -  person Gary Russell    schedule 04.06.2019
comment
@GaryRussell, извини, это сложно объяснить, не написав эссе. Я добавил правку к вопросу, надеюсь, теперь это имеет больше смысла   -  person michauwilliam    schedule 05.06.2019


Ответы (1)


Вам нужно показать трассировку стека, но я предполагаю, что, поскольку вы используете @SpringBootTest, реальный потребитель (метод слушателя) также получает сообщение, поэтому вы получаете исключение.

Непонятно, что вы здесь тестируете (своим тестовым Consumer) объектом.

Лучшим тестом было бы, скажем, внедрить службу MailSender в ваш bean-компонент-слушатель, а в тестовом примере внедрить имитацию MailSender, чтобы убедиться, что все работает должным образом.

person Gary Russell    schedule 05.06.2019