Как отправить сообщение на Кафку

Я новичок в реактивном программировании и пытаюсь реализовать самый простой сценарий. Я хочу отправлять сообщение в kafka каждый раз, когда файл помещается в определенную папку. Я думаю, что плохо понимаю основы ... так что, пожалуйста, не могли бы вы мне помочь?

Итак, у меня есть несколько вопросов: в чем разница между smallrye-reactive-messaging и smallrye-reactive-streams-операторами?

У меня есть такой простой код:

@Outgoing( "my-topic" )
public PublisherBuilder<Message<MessageWrapper>> generate() {
     if(Objects.isNull(currentMessage)){
          //currentMessage is an instance variable which is null when I start the application
          return ReactiveStreams.of(new MessageWrapper()).map(Message::of);
     }
     else {
          //currentMessage has been correctly set with the file information
          LOGGER.info(currentMessage);
          return ReactiveStreams.of(currentMessage).map(Message::of);
     }
}

Когда код входит в оператор if, все в порядке, и я получил сериализацию JSON моего объекта с нулевыми значениями. Однако я не понимаю, почему, когда мой код переходит к оператору else, ничего не переходит в тему? Кажется, что инструкции .of оператора if нарушили потоки или что-то в этом роде ...

Как сохранить непрерывные потоки, которые «реагируют» на новые отброшенные файлы? (или другие события, такие как HTTP-запрос GET или что-то в этом роде) ...

Если я не верну экземпляр PublisherBuilder, а, например, целое число, тогда моя тема kafka будет заполнена очень огромным потоком целочисленного значения. Вот почему в примерах используются некоторые интервалы при отправке сообщений ...

Что мне следует использовать: CompletationStage или CompletableFuture? RxJAva2? Немного сбивает с толку, какую библиотеку использовать (vertx, smallrye, rxjava2, microprofile, ...)

В чем разница между:

  • ReactiveStreams.fromCompletionStage
  • ReactiveStreams.fromProcessor
  • ReactiveStreams.fromPublisher
  • ReactiveStreams.fromSubscriber

Какой использовать в каком сценарии?

Большое спасибо !


person bertranddeweer    schedule 19.02.2020    source источник


Ответы (1)


Начнем с разницы между операторами smallrye-reactive-messaging и smallrye-reactive-streams: операторы smallrye-reactive-streams - это то же самое, что smallrye-reactive-messaging, но, кроме того, он поддерживает MicroProfile-context-duplication. Поскольку большинство провайдеров реактивного обмена сообщениями используют Vert.x за сценой, он будет обрабатывать ваше сообщение в стиле цикла событий, что означает, что оно будет выполняться в отдельном потоке. Иногда вам нужно распространить некоторый ctx из вашего базового потока в новый поток (например, заполнение контекста CDI и Tx для выполнения некоторой логики диспетчера сущностей JPA). Вот где помощь по распространению ctx.

Для сигнатур методов. Вы можете ознакомиться с официальной документацией разделов SmallRye-reactive-streams 3 , 4 и 5. У каждого свой сценарий использования. Вам решать, какой аромат вы хотите использовать.

Когда что использовать? Если вы не работаете в реактивном контексте, вы можете использовать нижеприведенное для отправки сообщений.

@Inject @Channel ("мой-канал") Emitter emitter;

Для потребления сообщений вы можете использовать подпись метода следующим образом:

@Incoming ("канал-2") public CompletionStage doSomething (сообщение anEvent)

Or

@Incoming ("канал-2") public void doSomething (String anEvent)

Надеюсь, это поможет.

person iabughosh    schedule 19.02.2020