Я новичок в реактивном программировании и пытаюсь реализовать самый простой сценарий. Я хочу отправлять сообщение в kafka каждый раз, когда файл помещается в определенную папку. Я думаю, что плохо понимаю основы ... так что, пожалуйста, не могли бы вы мне помочь?
Итак, у меня есть несколько вопросов: в чем разница между smallrye-reactive-messaging и smallrye-reactive-streams-операторами?
У меня есть такой простой код:
@Outgoing( "my-topic" )
public PublisherBuilder<Message<MessageWrapper>> generate() {
if(Objects.isNull(currentMessage)){
//currentMessage is an instance variable which is null when I start the application
return ReactiveStreams.of(new MessageWrapper()).map(Message::of);
}
else {
//currentMessage has been correctly set with the file information
LOGGER.info(currentMessage);
return ReactiveStreams.of(currentMessage).map(Message::of);
}
}
Когда код входит в оператор if, все в порядке, и я получил сериализацию JSON моего объекта с нулевыми значениями. Однако я не понимаю, почему, когда мой код переходит к оператору else, ничего не переходит в тему? Кажется, что инструкции .of оператора if нарушили потоки или что-то в этом роде ...
Как сохранить непрерывные потоки, которые «реагируют» на новые отброшенные файлы? (или другие события, такие как HTTP-запрос GET или что-то в этом роде) ...
Если я не верну экземпляр PublisherBuilder, а, например, целое число, тогда моя тема kafka будет заполнена очень огромным потоком целочисленного значения. Вот почему в примерах используются некоторые интервалы при отправке сообщений ...
Что мне следует использовать: CompletationStage или CompletableFuture? RxJAva2? Немного сбивает с толку, какую библиотеку использовать (vertx, smallrye, rxjava2, microprofile, ...)
В чем разница между:
- ReactiveStreams.fromCompletionStage
- ReactiveStreams.fromProcessor
- ReactiveStreams.fromPublisher
- ReactiveStreams.fromSubscriber
Какой использовать в каком сценарии?
Большое спасибо !