Как динамически добавлять элементы в горячий флюс реактора другим способом?

У меня есть служба источника данных, которая принимает в качестве параметра наблюдателя.

void subscribe(Consumer onEventConsumer);

Я хочу использовать поток в качестве потока ответов для RSocket. Как я могу это сделать? Как я вижу сейчас, это должно быть что-то вроде

Flux<T> controllerMethod(RequestMessage mgs) {
   var flux = Flux.empty();
   dataSource.subscribe(event -> flux.push(event));
   return flux;
}

Но у меня есть большие сомнения, что это правильное решение, и я новичок в реактивном подходе, я не знаю, какие методы мне здесь следует использовать?


person Mr.Ustiik    schedule 07.05.2020    source источник


Ответы (3)


Как уже указывал Саймон, это то, для чего вы используете Flux.create.

Ознакомьтесь с Руководством по началу работы на projectreactor.io.

В кадре вы регистрируете настраиваемый прослушиватель внутри лямбды метода create:

Flux<String> bridge = Flux.create(sink -> {
    myEventProcessor.register( 
      new MyEventListener<String>() { 

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s); 
          }
        }

        public void processComplete() {
            sink.complete(); 
        }
    });
});

Что вы хотите сделать, так это передать входящие элементы в FluxSink, который затем опубликует эти элементы в Flux.

person kerner1000    schedule 24.01.2021

У меня была аналогичная ситуация, когда мне нужно было потреблять и передавать сообщения в потоке. Итак, это упрощенная версия, комментарии в коде:

public class Main {
    // create queue for storing the messages
    private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(50);
    private static final Consumer<String> consumer = s -> {
        // block another spawned thread if no more space is present
        try { queue.put(s); } catch (InterruptedException e) {}
    };

    public static void main(String[] args) throws Exception {
        // size shouldn't be more than our queue size
        IntStream.range(0, 50).forEach(Main::consume);
        fluxGenerator().subscribe(); // subscribe to flux
        // write to the queue after subscribing
        // here the size can be anything as long as subscriber can handle it
        IntStream.range(50, 100000).forEach(Main::consume);
    }

    static Flux<String> fluxGenerator() {
        return Flux.<String>generate(sink -> {
            // block another spawned thread if no more elements are present
            try {sink.next(queue.take()); } catch (InterruptedException e) {}
        })
                // we need to subscribe on another thread
                .subscribeOn(Schedulers.newSingle("async"))
                .log();
    }

    static void consume(String str) {
        consumer.accept(str); // consume the messages
    }

    static void consume(Number i) {
        consume("" + i);
    }

}
person Aniket Sahrawat    schedule 08.05.2020
comment
Спасибо за ответ, но проблема в том, что я хочу добавить Flux, чтобы избежать очереди. - person Mr.Ustiik; 10.05.2020
comment
@ Mr.Ustiik У вас должно быть что-то среднее между конвейерами сообщений, например, очередь. Не стоит ожидать, что мы напишем весь код. - person Aniket Sahrawat; 10.05.2020

это типичный вариант использования Flux.create. вы регистрируете наблюдателя внутри лямбда-выражения create, который будет передавать полученные данные в предоставленный FluxSink

person Simon Baslé    schedule 09.05.2020
comment
Верно ли ваше предложение в случае, когда мы хотим добавить элемент к потоку после вызова Http? то есть клиент отправляет запрос POST в конечную точку с сообщением, которое он хочет опубликовать. Спасибо - person Saad Moumen; 01.12.2020
comment
это звучит как совершенно другой вариант использования. в случае OP один Flux должен отражать одного слушателя, установленного путем вызова (как я понимаю, нереактивного) subscriber(Consumer). в вашем случае это похоже на то, что несколько вызовов этой конечной точки должны публиковаться в одном и том же Flux. Посмотрите на Sinks API в 3.4.0 для этого. - person Simon Baslé; 02.12.2020
comment
Это именно то, что я сделал! Он НАСТОЛЬКО хорошо сделан и, кажется, охватывает безопасное производство из нескольких потоков. Спасибо, Саймон. - person Saad Moumen; 03.12.2020