Как справиться с ошибкой сериализации в связывателе потоков Spring Cloud Stream Kafka?

Я пишу приложение потоков Kafka, используя связку потоков Kafka Stream Cloud Spring.

Пока потребитель публикует сообщение в теме вывода, может возникнуть ошибка, например Ошибка сериализации или Ошибка сети.

В этом коде -

@Bean
public Function<KStream<Object, String>, KStream<Object, String>> process() {
    return (input) -> {
        KStream<Object, String> kt = input.flatMapValues(v -> Arrays.asList(v.toUpperCase().split("\\W+")));
        return kt;
    };
}

Здесь при создании сообщения обратно в тему вывода, если возникает ошибка, как с ней справиться. Есть ли в связывателе потоков Kafka какой-либо другой механизм, кроме RetryTemplate?


comment
@sobychacko, не могли бы вы помочь?   -  person shreyas.k    schedule 08.06.2020
comment
Привет, может быть, есть несколько способов обойти это в подшивке. Вы можете попробовать использовать StreamsBuilderFactoryBeanCustomizer и предоставить различные хуки для обработки исключений производителя. См. Этот блог: spring.io/blog/2019/12/06/   -  person sobychacko    schedule 08.06.2020


Ответы (1)


См. Spring для Apache Документация Kafka.

Когда десериализатору не удается десериализовать сообщение, Spring не может решить проблему, потому что она возникает до того, как poll () вернется. Чтобы решить эту проблему, в версии 2.2 появился ErrorHandlingDeserializer2. Этот десериализатор делегирует полномочия реальному десериализатору (ключу или значению). Если делегату не удается десериализовать содержимое записи, ErrorHandlingDeserializer2 возвращает нулевое значение и DeserializationException в заголовке, который содержит причину и необработанные байты. Когда вы используете MessageListener на уровне записи, если ConsumerRecord содержит заголовок DeserializationException для ключа или значения, вызывается ErrorHandler контейнера с ошибкой ConsumerRecord. Запись слушателю не передается.

person Gary Russell    schedule 08.06.2020