Как перехватить сообщения Spring Cloud Stream?

Spring позволяет перехватывать сообщения для многих своих продуктов, таких как RestTemplate и SpringMVC. Можно ли перехватить сообщения Spring Cloud Stream? Как для входящих, так и для исходящих сообщений.


person Jose Martinez    schedule 26.05.2016    source источник


Ответы (2)


Не уверен, что вы имеете в виду под перехватом - оба приведенных вами примера не основаны на сообщениях :).

Но вы хотите получить доступ к полному сообщению, вы можете использовать это как аргумент для @StreamListener или @ServiceActivator-аннотированного метода. Кроме того, Spring Cloud Stream позволяет настроить полный конвейер интеграции Spring, чтобы вы могли добавлять советы и все необходимое - см. Здесь: https://github.com/spring-projects/spring-integration-java-dsl/wiki/Spring-Integration-Java-DSL-Reference.

Я бы посоветовал вам также взглянуть на справочник по интеграции Spring http://docs.spring.io/autorepo/docs/spring-integration/4.2.6.RELEASE/reference/html/. Spring Cloud Stream автоматически вводит каналы, и оттуда у вас есть полная свобода в том, как вы строите свой конвейер.

Надеюсь, это поможет, Мариус

person Marius Bogoevici    schedule 26.05.2016
comment
Да, я начал читать Spring Integration несколько часов назад и думаю, что ответ - GlobalChannelInterceptor. Перехватчик - это распространенный паттерн проектирования. в основном это когда вы получаете запрос или сообщение до или после его обработки. - person Jose Martinez; 26.05.2016
comment
Да, в этом случае @GlobalChannelInterceptor должен быть полезен. - person Marius Bogoevici; 26.05.2016

Был способен перехватывать входящие и исходящие сообщения Spring Cloud Stream с помощью аннотации GlobalChannelInterceptor и интерфейса ChannelInterceptor. См. Образец ниже.

import org.springframework.integration.config.GlobalChannelInterceptor;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.stereotype.Component;

@Component
@GlobalChannelInterceptor
public class Interceptor implements ChannelInterceptor {

    private final Logger log = LoggerFactory.getLogger(Interceptor.class);

    @Override
    public Message<?> preSend(Message<?> msg, MessageChannel mc) {
        log.info("In preSend");
        return msg;
    }

    @Override
    public void postSend(Message<?> msg, MessageChannel mc, boolean bln) {
        log.info("In postSend");
    }

    @Override
    public void afterSendCompletion(Message<?> msg, MessageChannel mc, boolean bln, Exception excptn) {
        log.info("In afterSendCompletion");
    }

    @Override
    public boolean preReceive(MessageChannel mc) {
        log.info("In preReceive");
        return true;
    }

    @Override
    public Message<?> postReceive(Message<?> msg, MessageChannel mc) {
        log.info("In postReceive");
        return msg;
    }

    @Override
    public void afterReceiveCompletion(Message<?> msg, MessageChannel mc, Exception excptn) {
        log.info("In afterReceiveCompletion");
    }

}
person Jose Martinez    schedule 26.05.2016
comment
Не работает на стороне приемника кролика весеннего облачного потока, поскольку связующее устройство весеннего кролика не является реализацией PollableChannel - person Timmy Chiu; 06.07.2019
comment
Если он не работает со стороной приемника кролика весеннего облачного потока, какие другие варианты мне нужны, чтобы перехватить сообщение и прочитать, например, данные корреляции из заголовка сообщения для инициализации MDC. - person Ray; 27.05.2020