Spring Cloud Kafka Streams динамическое преобразование сообщений на основе информации заголовка

Я пытаюсь использовать Spring Cloud Kafka Streams для обработки сообщений из темы Kafka, содержащей различные типы сообщений. Например, мы получаем сообщение JSON из темы, которое может быть сообщением типа A или типа B. Производитель добавляет тип сообщения в заголовок, есть ли способ прочитать эту информацию заголовка в функциональном связывателе и соответствующим образом преобразовать сообщение? Или также есть опция «Выбор» для ветвления по мере поступления сообщений, чтобы направить сообщение к нужному конвертеру?


person AlexCon    schedule 07.02.2020    source источник


Ответы (1)


Если вы настраиваете привязку для использования nativeDecoding, десериализация выполняется Kafka (через свойство потребителя value.deserializer).

spring-kafka предоставляет JsonDeserializer, который ищет информацию о типе в определенных заголовках (установленных соответствующим JsonSerializer.

Он также предоставляет DelegatingDeserializer, который позволяет вам выбрать, какой десериализатор использовать, на основе значения в заголовке spring.kafka.serialization.selector.

См. Справочное руководство Spring для Apache Kafka для получения дополнительной информации.

person Gary Russell    schedule 07.02.2020