Есть ли интеграция весенней облачной функции webflux + весенний облачный поток с http-источником

Я пытаюсь интегрировать весенний облачный поток с весенней облачной функцией webflux

поскольку они не рекомендуют реактивные потоки Spring Cloud в будущих выпусках, я пытаюсь использовать функции Spring Cloud https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/2.1.2.RELEASE/single/spring-cloud-stream.html#spring-cloud-stream-preface-notable-deprecations.

Облачная веб-функция Spring может отображать конечную точку своей функции с помощью путей, как в документе

https://cloud.spring.io/spring-cloud-static/spring-cloud-function/1.0.0.RELEASE/single/spring-cloud-function.html

из облачного потока я вижу, что источник должен быть определен как поставщик https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/2.1.2.RELEASE/single/spring-cloud-stream.html#_spring_cloud_function

но мой вариант использования - получить данные POST из реактивной конечной точки http и принять их в kafka, есть ли способ достичь этого из веб-функции весеннего облака и весеннего облачного потока?

из документа для функции весенних облаков с потоком весенних облаков

@SpringBootApplication
@EnableBinding(Source.class)
public static class SourceFromSupplier {
    public static void main(String[] args) {
        SpringApplication.run(SourceFromSupplier.class, "--spring.cloud.stream.function.definition=date");
    }
    @Bean
    public Supplier<Date> date() {
        return () -> new Date(12345L);
    }
}

и если я запустил это, я вижу, что дата вставляется в kafka каждую 1 секунду, и если я вызываю конечную точку получения для поставщика, например localhost: / 8080 / date, в ответ на дату, есть ли способ ввести paylaod из сообщения в кафка с функцией весеннего облака?


person jay    schedule 27.03.2019    source источник


Ответы (1)


Существует проблема, которую помог выявить ваш вопрос, и она связана с несогласованностью жизненного цикла между автоконфигурациями, предоставляемыми функцией и потоком. Проблема проявляется в том, что точка покоя, созданная Spring Cloud Functions, не может видеть привязки, поскольку она создана намного раньше.

Так что мы рассмотрим этот вопрос в ближайшее время. Между тем существует обходной путь, который потребует от вас доступа к каналу output из ApplicationContext (см. Ниже):

@SpringBootApplication
@EnableBinding(Source.class)
public class SimpleFunctionRabbitDemoApplication {

  public static void main(String[] args) throws Exception {      
    SpringApplication.run(SimpleFunctionRabbitDemoApplication.class);
  }

  @Bean
  public Consumer<String> storeSync(ApplicationContext context) {
     return v -> {
        MessageChannel channel = context.getBean(Source.OUTPUT, MessageChannel.class);
        channel.send(MessageBuilder.withPayload(v).build());
     };
  }
}
person Oleg Zhurakousky    schedule 27.03.2019
comment
Спасибо за быстрый ответ, но все же я не могу вводить потоки kafka, я выставил bean-компонент типа Consumer для функции весеннего облака, и когда я отправляю ему некоторые данные, я вижу данные в функции, но когда я вызываю theink.input (). send (MessageBuilder.withPayload (paylaod) .build ()) Мне не удалось отправить сообщение в kafka, но я получил истину, когда я его регистрирую. Я использую выпуск Spring Boot 2.1.3, Spring Cloud Stream Fishtown.SR2, springCloudFunctionVersion 2.0.1.RELEASE - person jay; 27.03.2019
comment
Подумайте о создании небольшого образца, который демонстрирует вашу проблему, и отправьте i на GitHub, чтобы мы могли посмотреть - person Oleg Zhurakousky; 27.03.2019
comment
привет, пожалуйста, найдите мой образец на github [github.com/jayasai470/spring-sample- облако-поток-функция] - person jay; 27.03.2019
comment
попробуйте это, пожалуйста, github.com/jayasai470/spring-sample-cloud-stream-function - person jay; 27.03.2019
comment
Хорошо, я посмотрю завтра, а пока, пожалуйста, обновите Stream, который только что был выпущен как 2.2.0.M1 - spring.io/blog/2019/03/26/ - person Oleg Zhurakousky; 27.03.2019
comment
Какие-либо предложения? - person jay; 29.03.2019
comment
Да, я только что обновил ответ и поднял проблему в потоке - github.com/spring-cloud/spring-cloud-stream/issues/1678 - person Oleg Zhurakousky; 29.03.2019
comment
Просто попробовал ваши предложения, но безуспешно, я внес изменения в свое репо, он по-прежнему ведет себя так же, как когда я запускаю свой rest api, он застревает в бесконечном цикле из журналов, я вижу канал. Send вернул истину, хотя я не видел сообщений в кафке - person jay; 29.03.2019
comment
Мне просто пришлось удалить мой --spring.cloud.stream.function.definition = storeSync, и он сработал, спасибо за предложение - person jay; 29.03.2019
comment
channel.send - это блокирующий вызов, есть ли какой-либо реактивный API для вставки в поток kafka, я могу найти его только в doc cloud.spring.io/spring-cloud-static/spring-cloud-stream/ - person jay; 29.03.2019
comment
Я предлагаю обсудить проблему в spring-cloud-stream. Я тоже думал об этом, но похоже, что нам нужно что-то реализовать - person Oleg Zhurakousky; 29.03.2019
comment
хорошо, просто открыл его github.com/spring-cloud/spring-cloud- stream / issues / 1679 - person jay; 29.03.2019
comment
Ошибки вставки на уровне связывателя kafka не выбрасываются в облачную функцию webflux, есть ли способ получить исключение в методе channel.send? - person jay; 30.03.2019