Как я могу отправить сообщение с новой функциональной моделью Spring Cloud Stream Kafka?
Устаревший способ выглядел так.
public interface OutputTopic {
@Output("output")
MessageChannel output();
}
@Autowired
OutputTopic outputTopic;
public someMethod() {
outputTopic.output().send(MessageBuilder.build());
}
Но как отправить сообщение в функциональном стиле?
application.yml
spring:
cloud:
function:
definition: process
stream:
bindings:
process-out-0:
destination: output
binder: kafka
@Configuration
public class Configuration {
@Bean
Supplier<Message<String>> process() {
return () -> {
return MessageBuilder.withPayload("foo")
.setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes()).build();
};
}
Я бы Autowire a MessageChannel, но нет MessageChannel-Bean для процесса, process-out-0, output или чего-то в этом роде. Или я могу отправить сообщение с помощью Supplier-Bean? Может ли кто-нибудь привести мне пример? Большое спасибо!