Spring Cloud Stream Kafka отправить сообщение

Как я могу отправить сообщение с новой функциональной моделью Spring Cloud Stream Kafka?

Устаревший способ выглядел так.

public interface OutputTopic {

    @Output("output")
    MessageChannel output();
}

@Autowired
OutputTopic outputTopic;

public someMethod() {
    
    outputTopic.output().send(MessageBuilder.build());
}

Но как отправить сообщение в функциональном стиле?

application.yml

spring:
  cloud:
    function:
      definition: process
    stream:
      bindings:
        process-out-0:
          destination: output
          binder: kafka
@Configuration
public class Configuration {

    @Bean
    Supplier<Message<String>> process() {
        return () -> {
            return MessageBuilder.withPayload("foo")
                    .setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes()).build();
        };
    }

Я бы Autowire a MessageChannel, но нет MessageChannel-Bean для процесса, process-out-0, output или чего-то в этом роде. Или я могу отправить сообщение с помощью Supplier-Bean? Может ли кто-нибудь привести мне пример? Большое спасибо!


person Sven Weisker    schedule 15.03.2021    source источник


Ответы (1)


Вы можете использовать StreamBridge или API реактора - см. Отправка произвольных данных на вывод (например, источники, управляемые иностранными событиями)

person Gary Russell    schedule 15.03.2021
comment
EmitterProcessor устарел. Вы знаете новейший способ его использования? - person Sven Weisker; 16.03.2021
comment
StreamBridge, как упомянул Гэри, является рекомендуемым способом. Мы обновим документацию, чтобы удалить все ссылки на EmitterProcessor, - person Oleg Zhurakousky; 16.03.2021