Я использую функцию DLQ Spring Cloud Stream со связывателем Kafka. Когда обработка сообщения завершается неудачно, сообщение отправляется в DLQ, как и ожидалось, однако я хочу иметь возможность изменить сообщение, отправляемое в DLQ, чтобы включить некоторую дополнительную диагностическую информацию. Проблема в том, что сообщения, отправленные в DLQ, являются исходными сообщениями; любые сделанные мной мутации игнорируются. Мой подход к решению этой проблемы до сих пор заключался в том, чтобы перехватить сообщение перед его отправкой в DLQ и добавить дополнительную информацию, которая хранится в другом bean-компоненте. В частности, я пробовал эти два подхода:
- Решение: реализовать простой Kafka
ProducerInterceptor
для DLQ. Проблема: реализация создается вне контекста Spring, поэтому я не могу внедрить другие bean-компоненты, которые мне нужны. Spring Kafka задокументировала это решение, однако для этого требуется создание новогоProducerFactory
, что означает, что я не могу использовать компонент из базового Spring Cloud Stream. - Решение: внедрите Spring
ChannelInterceptor
. Проблема: я не могу получить ссылку на канал сообщений для DLQ или имя основного канала, поэтому я не могу настроить перехватчик только для сообщений DLQ.
Есть идеи, как это можно решить?