Перехватить входящее сообщение Spring Cloud Stream SubscribableChannel

Поскольку метод postReceive из org.springframework.messaging.support.ChannelInterceptor не вызывается в org.springframework.messaging.SubscribableChannel. Есть ли способ перехватить все входящие сообщения для метода annotated@StreamListener(Sink.INPUT)?

Например:

Перехватите сообщение, прежде чем перейти к методу handle

@StreamListener(Sink.INPUT)
public void handle(Foo foo) {
    // ...
}

Ниже моя настройка для Spring Cloud Stream.

public interface EventSink {

    String INPUT1 = "input1";
    String INPUT2 = "input2";

    @Input(INPUT1)
    SubscribableChannel input1();

    @Input(INPUT2)
    SubscribableChannel input2();   
}

public interface EventSource {

    String OUTPUT1 = "output1";
    String OUTPUT2 = "output2";

    @Output(OUTPUT1)
    MessageChannel output1();

    @Output(OUTPUT2)
    MessageChannel output2()';
}

spring:
  cloud:
    stream:
      bindings:
        input1:
          destination: input1
        input2:
          destination: input2     
        output1:
          destination: output1
        output2:
          destination: output2

public class EventHandler {

    @StreamListener(EventSink.INPUT1)
    public void handle(Foo1 foo) {
        // ...
    }

    @StreamListener(EventSink.INPUT2)
    public void handle(Foo2 foo) {
        // ...
    }

}

@Service
public class Bar1Service {

    @Autowired
    private EventSource source;

    public void bar1() {
        source.output1().send(MessageBuilder.withPayload("bar1").build());
    }

}

@Service
public class Bar2Service {

    @Autowired
    private EventSource source;

    public void bar2() {
        source.output2().send(MessageBuilder.withPayload("bar2").build());
    }

}

person user1831877    schedule 04.01.2017    source источник
comment
Что ты хочешь делать в перехватчике? Вы можете перехватить preSend().   -  person Gary Russell    schedule 04.01.2017
comment
Я хотел бы извлечь и подготовить некоторую информацию из заголовков и сохранить их в ThreadLocal. Я думал, что preSend для вывода.   -  person user1831877    schedule 04.01.2017
comment
preSend() подходит, но смотрите мой ответ.   -  person Gary Russell    schedule 04.01.2017


Ответы (1)


С помощью DirectChannel связующее вызывает вашего слушателя в том же потоке, поэтому здесь подходит preSend.

Однако вам не нужно возиться с ThreadLocal, вы можете получить доступ к заголовкам, используя сигнатуру метода...

@StreamListener(Processor.INPUT)
public void handle(Foo foo, @Header("bar") String bar) {
    ...
}

ИЗМЕНИТЬ

@EnableBinding(Processor.class)
public class So41459187Application {

    public static void main(String[] args) {
        SpringApplication.run(So41459187Application.class, args);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public String handle(String in) {
        return in.toUpperCase();
    }

    @Configuration
    public static class Config {

        @Bean
        public BeanPostProcessor channelConfigurer() {
            return new BeanPostProcessor() {

                @Override
                public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
                    return bean;
                }

                @Override
                public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
                    if ("input".equals(beanName)) {
                        ((AbstractMessageChannel) bean).addInterceptor(new ChannelInterceptorAdapter() {

                            @Override
                            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                                System.out.println("PreSend on INPUT: " + message);
                                return message;
                            }

                        });
                    }
                    else if ("output".equals(beanName)) {
                        ((AbstractMessageChannel) bean).addInterceptor(new ChannelInterceptorAdapter() {

                            @Override
                            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                                System.out.println("PreSend on OUTPUT: " + message);
                                return message;
                            }

                        });
                    }
                    return bean;
                }

            };
        }
    }

}
person Gary Russell    schedule 04.01.2017
comment
Спасибо за ответ, оценил! Я использую ThreadLocal, потому что есть несколько методов handle, прослушивающих разные каналы, поэтому я искал глобальный способ получения информации из заголовков для всех методов handle. Так что мне не нужно ставить @Headers или MessageHeaders на каждый из handle методов. - person user1831877; 05.01.2017
comment
И я подтвердил, что preSend подходит. (Еще раз спасибо) Но теперь я в замешательстве. И входящие, и исходящие каналы будут вызывать метод preSend. Для входящего канала мне нужно извлечь информацию из заголовков, а для исходящего канала мне нужно установить информацию в заголовках для передачи потребителям. Как я могу разделить эти два бизнеса? - person user1831877; 05.01.2017
comment
Или, может быть, я могу настроить разные перехватчики для производителя и потребителя на привязки? - person user1831877; 05.01.2017
comment
Да, вам просто нужно настроить разные перехватчики на каждом канале. - person Gary Russell; 05.01.2017
comment
Не могли бы вы показать мне, как я могу сделать это в весеннем облачном потоке? Поскольку канал автоматически генерируется в весеннем облачном потоке, и я не могу найти способ настроить это. - person user1831877; 05.01.2017
comment
Как вы сейчас настраиваете свой перехватчик? - person Gary Russell; 05.01.2017
comment
Есть несколько способов сделать это; Я отредактировал свой ответ с одним из них. - person Gary Russell; 05.01.2017
comment
Я использую @GlobalChannelInterceptor прямо сейчас, потому что я не нашел способ настроить перехватчик. Я обновил свои настройки для весеннего облачного потока в вопросе выше. - person user1831877; 05.01.2017
comment
При использовании глобальных перехватчиков вы можете просто использовать свойство patterns, чтобы определить, к каким именам каналов применяется перехватчик. - person Gary Russell; 05.01.2017
comment
Благодарю вас! Оба решения идеальны! - person user1831877; 06.01.2017
comment
@user1831877 user1831877 Можете ли вы поделиться, как выглядит код? у меня есть аналогичный вариант использования, когда мне нужно обрабатывать заголовки сообщений, поступающих в мой SubscribableChannel, и устанавливать локальный поток. - person java_geek; 13.05.2020
comment
@GaryRussell Разве дополнительная проверка для каждого имени компонента не является накладной? Есть ли альтернативный способ сделать это через АОП; Я нашел эту ссылку stackoverflow.com/questions/50682722/; Однако ChannelInterceptorManager, похоже, недоступен в весеннем облачном потоке 3.0.1. пожалуйста, порекомендуйте - person java_geek; 13.05.2020
comment
Постпроцессор компонента postProcessAfterInitialization() запускается только один раз, во время инициализации для добавления перехватчиков только один раз. Это более эффективно, чем АОП. - person Gary Russell; 13.05.2020