Spring Cloud Stream - изменение сообщений DLQ

Я использую функцию DLQ Spring Cloud Stream со связывателем Kafka. Когда обработка сообщения завершается неудачно, сообщение отправляется в DLQ, как и ожидалось, однако я хочу иметь возможность изменить сообщение, отправляемое в DLQ, чтобы включить некоторую дополнительную диагностическую информацию. Проблема в том, что сообщения, отправленные в DLQ, являются исходными сообщениями; любые сделанные мной мутации игнорируются. Мой подход к решению этой проблемы до сих пор заключался в том, чтобы перехватить сообщение перед его отправкой в ​​DLQ и добавить дополнительную информацию, которая хранится в другом bean-компоненте. В частности, я пробовал эти два подхода:

  1. Решение: реализовать простой Kafka ProducerInterceptor для DLQ. Проблема: реализация создается вне контекста Spring, поэтому я не могу внедрить другие bean-компоненты, которые мне нужны. Spring Kafka задокументировала это решение, однако для этого требуется создание нового ProducerFactory, что означает, что я не могу использовать компонент из базового Spring Cloud Stream.
  2. Решение: внедрите Spring ChannelInterceptor. Проблема: я не могу получить ссылку на канал сообщений для DLQ или имя основного канала, поэтому я не могу настроить перехватчик только для сообщений DLQ.

Есть идеи, как это можно решить?


person Boon    schedule 09.06.2020    source источник


Ответы (1)


Проблема: я не могу получить ссылку на канал сообщений для DLQ или имя основного канала, поэтому я не могу настроить перехватчик только для сообщений DLQ.

Имя канала ошибки

destinationName.group.errors

Вы можете получить его как AbstractMessageChannel из контекста приложения ...

context.getBean("destinationName.group.errors", AbstractMessageChannel.class)

... и добавить перехватчик.

В качестве альтернативы вообще не используйте механизм DLQ связующего и добавьте свой собственный обработчик ошибок:

@ServiceActivator(inputChannel = "destinationName.group.errors")
void errors(Message<?> error) {
    ...
}
person Gary Russell    schedule 09.06.2020
comment
Спасибо за информацию. Я использую собственное название темы DLQ; Я посмотрю, работает ли это еще. Было бы здорово использовать собственный механизм DLQ, но у проекта уже есть зависимости от Spring, поэтому я застрял на этом. - person Boon; 09.06.2020
comment
Хм, не могу получить ссылку на канал DLQ в этом примере. Единственный AbstractMessageChannel в контексте - это errorChannel, и он принимает MessagingException сообщения, а не исходящие сообщения DLQ. Я также проверил контекст для MessageChannel beans, и я тоже не вижу его. - person Boon; 09.06.2020
comment
Вам нужно дождаться регистрации привязки; вы не можете просто подключить его автоматически; вы можете добавить SmartLifecycle @Bean поместить его в более позднюю (более высокую) «фазу», чем фаза, в которой создается привязка (это Integer.MAX_VALUE - 1000). autoStartup должен быть true, чтобы Spring вызывал метод start() (куда вы должны поместить свой код). - person Gary Russell; 09.06.2020