Я пытаюсь использовать Spring Cloud Kafka Streams для обработки сообщений из темы Kafka, содержащей различные типы сообщений. Например, мы получаем сообщение JSON из темы, которое может быть сообщением типа A или типа B. Производитель добавляет тип сообщения в заголовок, есть ли способ прочитать эту информацию заголовка в функциональном связывателе и соответствующим образом преобразовать сообщение? Или также есть опция «Выбор» для ветвления по мере поступления сообщений, чтобы направить сообщение к нужному конвертеру?
Spring Cloud Kafka Streams динамическое преобразование сообщений на основе информации заголовка
Ответы (1)
Если вы настраиваете привязку для использования nativeDecoding
, десериализация выполняется Kafka (через свойство потребителя value.deserializer
).
spring-kafka предоставляет JsonDeserializer
, который ищет информацию о типе в определенных заголовках (установленных соответствующим JsonSerializer
.
Он также предоставляет DelegatingDeserializer
, который позволяет вам выбрать, какой десериализатор использовать, на основе значения в заголовке spring.kafka.serialization.selector
.
См. Справочное руководство Spring для Apache Kafka для получения дополнительной информации.
person
Gary Russell
schedule
07.02.2020