Я в основном успешно настраивал тест с использованием EmbeddedKafka, который создает и использует одно сообщение. Ниже представлена рабочая версия теста:
@RunWith(SpringRunner.class)
@SpringBootTest
@EmbeddedKafka
public class KafkaFlowTest {
@Autowired
EmbeddedKafkaBroker broker;
public static final String EMAIL_TOPIC = "email-service";
static {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
private Consumer<String, KafkaEmailMessageWrapper> consumer;
private Producer<String, KafkaEmailMessageWrapper> producer;
@Before
public void setUp() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group", "true", broker);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
DefaultKafkaConsumerFactory<String, KafkaEmailMessageWrapper> cf =
new DefaultKafkaConsumerFactory<>(
consumerProps,
new StringDeserializer(),
new JsonDeserializer<>(KafkaEmailMessageWrapper.class)
);
consumer = cf.createConsumer();
consumer.subscribe(Collections.singleton(EMAIL_TOPIC));
consumer.poll(Duration.ZERO);
DefaultKafkaProducerFactory<String, KafkaEmailMessageWrapper> pf =
new DefaultKafkaProducerFactory<>(producerProps);
producer = pf.createProducer();
}
@Test
public void testNoErrorMessageFlow() {
KafkaEmailMessageWrapper wrapperMessage = KafkaEmailMessageWrapper.builder()
.emailRequest(
EmailMessage.builder()
.body("body")
.from("[email protected]")
.to(new String[]{"[email protected]"})
.subject("hey!")
.build()
)
.build();
producer.send(new ProducerRecord<>(EMAIL_TOPIC, "whatever", wrapperMessage));
producer.flush();
ConsumerRecord<String, KafkaEmailMessageWrapper> record = KafkaTestUtils.getSingleRecord(consumer, EMAIL_TOPIC);
KafkaEmailMessageWrapper receivedMessage = record.value();
assertEquals(wrapperMessage.getEmailRequest().getSubject(), receivedMessage.getEmailRequest().getSubject());
}
}
Это работает нормально, если я не добавлю assertEquals в поле to
, которое в JSON представляет собой список адресов электронной почты, разделенных ;
, а в классе - массив строк. Итак, в классе EmailMessage
у меня есть поле to
, помеченное знаком @JsonDeserializer
, указывающим на настраиваемый десериализатор, который разделяется на ;
и т. Д. Все работает, когда приложение запущено, только тесты прерываются.
Я попытался изменить приведенный выше код, чтобы изменить Producer<String, KafkaEmailMessageWrapper> producer
на private Producer<String, String> producer
, а затем в тесте отправить JSON вместо экземпляра KafkaEmailMessageWrapper
, но затем я получил исключение:
java.lang.ClassCastException: java.lang.String cannot be cast to com.test.model.KafkaEmailMessageWrapper
Таким образом, похоже, что десериализация из строки JSON в модель по какой-то причине не происходит в этом тестовом сценарии. Я хотел бы, чтобы тест был максимально приближен к реальному варианту использования, поэтому он должен выдавать строковое сообщение, а затем выполнять десериализацию для моего класса модели. Не уверен, почему этого не происходит, любая помощь в понимании того, почему это, будет оценена!
Для завершения это определение прослушивателя сообщений:
@KafkaListener(topics = "email-service")
public void receive(@Payload KafkaEmailMessageWrapper message)
ИЗМЕНИТЬ
Позвольте мне перефразировать проблему, потому что, возможно, я немного запутался. Обзор приложения:
- использовать сообщение JSON
- сериализуйте его в
KafkaEmailMessageWrapper
- послать электронное письмо
Это работает, я отправляю JSON в Kafka, он получен (определение слушателя выше) и работает правильно. Единственная проблема, с которой я столкнулся, это во время тестирования.
Я хочу воспроизвести ту же ситуацию в тесте, но когда я сделаю это:
kafkaTemplate.sendDefault("key", "<json message>");
Я получаю:
Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.test.model.KafkaEmailMessageWrapper] for GenericMessage
Однако когда я вместо этого отправляю экземпляр KafkaEmailMessageWrapper
:
kafkaTemplate.sendDefault("key", kafkaEmailMessageWrapper);
он работает, но тогда он не использует мои @JsonDeserialize
аннотации, которые я указал для класса:
@JsonDeserialize(using = SemicolonArrayDeserializer.class)
private String[] to;
@JsonDeserialize(using = SemicolonArrayDeserializer.class)
private String[] cc;
Таким образом, при отправке экземпляра поля to и cc всегда пусты, потому что в этих полях уже есть массивы строк, поэтому логика десериализации разделения по ;
ничего не делает. Это метод десериализации:
public String[] deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
ObjectMapper mapper = (ObjectMapper) p.getCodec();
JsonNode node = mapper.readTree(p);
return node.asText().split(";");
}
String subject
, который представляет тему письма, я думаю, здесь это не совсем актуально - person michauwilliam   schedule 04.06.2019unless I add assertEquals on the to field
- что вы получите? - person Gary Russell   schedule 04.06.2019