Многопоточность в Reactor 2.0 — почему я не могу передавать сигналы в несколько потоков

У меня проблемы с версией реактора 2.0. А именно, я пытаюсь настроить реактивный поток сигналов, который разветвляет сигнал в пул ожидающих потоков. Я очень хорошо знаком с Rx и с Reactive Cocoa, но здесь мне не хватает чего-то базового.

У меня есть базовое преобразование, которое выглядит следующим образом:

 WorkQueueDispatcher dispatcher = new WorkQueueDispatcher("dispatch", 10, 64, {... Exception handle code here ...}


return objectStream
            .partition(partitions)
            .dispatchOn(dispatcher)
            .merge()
            .map(new Function<Object, Object>() {
                @Override
                public Object apply(Object o) {
                    try {
                        return extension.eval(o, null);
                    } catch (UnableToEvaluateException e) {
                        e.printStackTrace();
                        return null;
                    }

                }
            });

Я пробовал этот поток примерно семью или восемью различными способами, включая разные диспетчеры и т. д. Я пытался разбить его на сгруппированный поток событий и обрабатывать каждый элемент отдельно, а затем записывать в отдельный поток для обработки. В любой ситуации я либо вижу, что все запросы обрабатываются в одном и том же потоке (который работает, а не многопоточен), либо получаю сообщение об ошибке, которое меня пугает:

   java.lang.IllegalStateException: Dispatcher provided doesn't support event     ordering.  For concurrent signal dispatching, refer to #partition()/groupBy()     method and assign individual single dispatchers. 
at reactor.core.support.Assert.state(Assert.java:387)
at reactor.rx.Stream.dispatchOn(Stream.java:720)
at reactor.rx.Stream.dispatchOn(Stream.java:650)

Я пробовал следующее:

  1. вручную делая раздел/группу.
  2. явно устанавливая отдельный однопоточный диспетчер (кольцо) для более ранних шагов.
  3. Просто говорю «фф», не работает, а просто сбрасывает в свою очередь для обработки.

Что мне здесь не хватает? Должен ли я не использовать вещательную компанию для запуска цикла сообщений? Я действительно не забочусь об исполнении заказа здесь.

(отредактировано)

Вот что я делаю со своим доморощенным кодом для масштабирования:

objectStream
        .consume(new Consumer<Object>() {
            @Override
            public void accept(Object o) {
                final Object target = o;
                tpe.execute(new Runnable(){
                    /**
                     * When an object implementing interface <code>Runnable</code> is used
                     * to create a thread, starting the thread causes the object's
                     * <code>run</code> method to be called in that separately executing
                     * thread.
                     * <p/>
                     * The general contract of the method <code>run</code> is that it may
                     * take any action whatsoever.
                     *
                     * @see Thread#run()
                     */
                    @Override
                    public void run() {
                        try {
                            //System.out.println("On thread "+ Thread.currentThread().getName());
                            Timer.Context onNext = onNextTimer.time();
                            Timer.Context timer = callComponentTimer.time();
                            Object translated = extension.eval(target, null);
                            timer.close();
                            broadcaster.onNext(translated);
                            onNext.close();
                        } catch (UnableToEvaluateException e) {
                            e.printStackTrace();
                        }
                    }
                });

изменить

Хорошо, я обновил его следующим образом:

 MetricRegistry reg = DMPContext.getContext().getMetricRegistry();


    de.init(null);


    ConsoleReporter reporter = ConsoleReporter.forRegistry(DMPContext.getContext().getMetricRegistry())
            .convertRatesTo(TimeUnit.SECONDS)
            .convertDurationsTo(TimeUnit.MILLISECONDS)
            .build();

    reporter.start(10, TimeUnit.SECONDS);

    final CountDownLatch latch = new CountDownLatch(COUNT);

    final Function<String, Object> translator = JSON.from(Request.class);
    String content = new String(Files.readAllBytes(Paths.get("/svn/DMPidea/Request.json")));

    Broadcaster<String> stringBroadcaster = Broadcaster.create();

    final Exec exec = new Exec();


    stringBroadcaster
            .partition(10)
            .consume(new Consumer<GroupedStream<Integer, String>>() {
                @Override
                public void accept(GroupedStream<Integer, String> groupedStream) {

                    groupedStream.dispatchOn(Environment.cachedDispatcher()).map(translator).map(new Function<Object, Object>() {
                        @Override
                        public Object apply(Object o) {
                            try {
                                System.out.println("Got thread " +Thread.currentThread().getName());
                                return de.eval(o, null);
                            } catch (UnableToEvaluateException e) {
                                e.printStackTrace();
                                return null;
                            }
                        }
                    }).consume(new Consumer<Object>() {
                        @Override
                        public void accept(Object o) {
                            latch.countDown();
                        }
                    });
                }
            });

    for (int i=0; i<COUNT; i++)
    {

        stringBroadcaster.onNext(content);

    }
    latch.await();

Я все еще вижу однопоточное выполнение:

Получил диспетчер потокаGroup-1 Получил диспетчер потокаGroup-1 Получил диспетчер потокаGroup-1 Получил диспетчер потокаGroup-1 Получил диспетчер потокаGroup-1 Получил диспетчер потокаGroup-1 Получил диспетчер потокаGroup-1


person Josh P    schedule 29.03.2015    source источник


Ответы (3)


Шаблон в Reactor2.0 заключается в использовании отдельных CachedDispatchers (RingBufferDispatchers) вместо использования WorkQueueDispatcher или ThreadPoolExecutorDispatcher. Это предлагает разделение потока на разные потоки. Это работает только для распараллеливания небольших (обычно неблокирующих) операций в логические потоки.

Для полного неблокирующего выполнения в отдельном пуле потоков см. мою правку внизу.

Вы захотите использовать Stream.groupBy() или Stream.partition(), чтобы разделить поток на несколько потоков, каждый из которых может быть отправлен в отдельные потоки.

  1. Stream.groupBy() — разделяет потоки на основе возвращаемого вами ключа.
  2. Stream.partition([int]) — сегменты в потоки на основе хэш-кода потоковых объектов (сигналов) и, необязательно, в указанное вами количество сегментов.

После того, как вы разделили/сгруппировали поток на отдельные потоки, вы можете использовать flatMap(), чтобы указать вашим новым потокам отправляться в отдельные потоки.

.flatMap(stream -> stream.dispatchOn(Environment.cachedDispatcher())

Вызов Environment.cachedDispatcher() на самом деле означает получение CachedDispatcher (RingBufferDispatcher) из их пула. Пул по умолчанию имеет размер, эквивалентный количеству процессоров вашего компьютера. CachedDispatcher создаются лениво, поэтому это не идеально делать внутри вашего вызова .dispatchOn(Dispatcher).

Чтобы заранее создать пул (DispatcherSupplier) из CachedDispatchers, вы можете использовать Environment.newCachedDispatchers(int, [groupName]). Вот пример из документации ProjectReactor о том, как все это объединить:

    DispatcherSupplier supplier1 = Environment.newCachedDispatchers(2, "groupByPool");
    DispatcherSupplier supplier2 = Environment.newCachedDispatchers(5, "partitionPool");

    Streams
        .range(1, 10)
        .groupBy(n -> n % 2 == 0) 
        .flatMap(stream -> stream
                .dispatchOn(supplier1.get()) 
                .log("groupBy")
        )
        .partition(5) 
        .flatMap(stream -> stream
                .dispatchOn(supplier2.get()) 
                .log("partition")
        )
        .dispatchOn(Environment.sharedDispatcher()) 
        .log("join")
        .consume();

Документация Reactor: разделение потоков на отдельные потоки

Обратите внимание, что в этом примере они также вызвали .dispatchOn(Environment.sharedDispatcher()) после последнего вызова flatMap(). Это делается для того, чтобы просто объединить поток обратно в один поток Dispatcher. В данном случае Environment.sharedDispatcher(), что является еще одним RingBufferDispatcher.

Я использую эту стратегию, чтобы разделить мой поток на отдельные потоки, чтобы параллельно выполнять блокирующие вызовы на отдельных CachedDispatcher, а затем снова объединять их в один поток на основном Environment.sharedDispatcher(), все неблокирующим/реактивным способом, например так:

    // Spring's RestTemplate for making simple REST (HTTP) calls
    RestTemplate restTemplate = new RestTemplate();
    List<String> urls = Arrays.asList("http://www.google.com", "http://www.imgur.com");
    Streams
        .from(urls)
        .groupBy(s -> s) // unique group (stream) per URL
        .flatMap(stream -> 
            // dispatch on separate threads per stream
            stream.dispatchOn(Environment.cachedDispatcher())
                 // blocking call in separate dispatching thread
                 .map(s -> restTemplate.getForObject(s, String.class)))
        // rejoin into single dispatcher thread
        .dispatchOn(Environment.sharedDispatcher())
        .consume(
             s -> { // consumer
                 System.out.println("---complete in stream---");
             },
             t -> { // error consumer
                 System.out.println("---exception---");
                 System.out.println(t);
             },
             s -> { // complete consumer
                 latch.countDown();
                 System.out.println("---complete---");
             });



EDIT: Для параллельного выполнения блокирующих операций вы все равно можете использовать пул потоков. В Reactor 2.0 вы можете сделать это с помощью диспетчера, поддерживаемого пулом потоков, предпочтительно Executors.newCachedThreadPool(), потому что он будет повторно использовать потоки, ограничивая количество нагрузки на GC.

Самый простой способ сделать это, который я нашел, — это использовать EventBus с ThreadPoolExecutorDispatcher, поддерживаемым Executors.newCachedThreadPool() (который на самом деле просто ThreadPoolExecutor с несколькими специфическими настройками, если вы посмотрите на функцию для создания newCachedThreadPool):

    // dispatcher backed by cached thread pool for limited GC pressure
    Dispatcher dispatcher = new ThreadPoolExecutorDispatcher(1024, 1024, Executors.newCachedThreadPool());

    int n = 1000000;
    EventBus bus = EventBus.create(dispatcher);
    CountDownLatch latch = new CountDownLatch(n);
    bus.on($("topic"), (Event<Integer> ev) -> {
        blockingServiceCall(100); // block for 100ms
        latch.countDown();
        if (ev.getData() % 1000 == 0) {
            System.out.println(Thread.currentThread() + " " + Thread.activeCount());
        }
    });

    long start = System.currentTimeMillis();
    for(int i = 0; i < n; i++) {
        bus.notify("topic", Event.wrap(i));
    }
    latch.await();
    System.out.println("ops/sec: " + (n * 1.0) / ((System.currentTimeMillis() - start) / 1000.0) );

Вам нужно будет отправить обратно в Stream после завершения блокирующего вызова внутри потребителя, если вы хотите вернуться к использованию Streams. Насколько я знаю, вы не можете просто автоматически объединить их обратно в поток, а также не можете использовать ThreadPoolExecutorDispatcher, поддерживаемый кэшированным пулом потоков, напрямую, выполнив Stream.dispatchOn(threadPoolExecutorDispatcher).

person kprager    schedule 31.07.2015

Просто просканировав, вы получите правильный ответ как можно скорее. Но сначала проблема не связана с тем фактом, что вы выполняете merge(), а затем выполняете свою логику? Reactive Stream — это однопоточный вызов onXXX для каждой спецификации (также и в RxJava, и в Akka Stream).

Как и в Rx, я бы просто выполнял работу по обработке внутри плоского потока или просто потреблял, если вам не нужно объединяться обратно (что является спорным моментом, если вы думаете об этом).

return objectStream
        .partition(partitions)
        .consume( partitionStream ->
           partitionStream
             .dispatchOn(Environment.cachedDispatcher()) 
             .consume( o -> {
                try {
                    return extension.eval(o, null);
                } catch (UnableToEvaluateException e) {
                    e.printStackTrace();
                    return null;
                }

            }
        ));
person smaldini    schedule 30.03.2015
comment
WorkQueueDispatcher напрямую не поддерживается в Rx, однако cachedDispatcher возвращает объединенный диспетчер (по умолчанию — ringBuffer) из N (количество процессоров по умолчанию), поэтому в моем предыдущем примере я назначу N разделу с N различными диспетчерами, эффективно увеличивая масштаб. Также обратите внимание, что в 2.0 и более поздних версиях есть специальный нестандартный компонент, который заменит WorkQueueDispatcher сверхурочно, RingBufferWorkProcessor, который действительно является компонентом Reactive Streams, который масштабирует столько подписчиков, сколько вы хотите. - person smaldini; 30.03.2015
comment
Pred, хорошо, значит, dispatchOn выберет независимый поток из пула с 2.0? Когда я печатал название потока, оно всегда выходило в основной поток, а не в отдельный поток. Я использую общую настройку Static Enviornment. Может ли это быть проблема? - person Josh P; 31.03.2015
comment
Итак, я протестировал эту версию и, как ни странно, на данный момент я все еще наблюдаю однопоточное поведение. Я вижу поток, говорящий, что он исходит из диспетчерской группы, но threadId всегда является диспетчерской группой-1, и я не вижу никаких других созданных в настоящее время потоков. - person Josh P; 01.04.2015

Делал и со слиянием, и без. Не имело никакого значения. Кодовая база здесь 2.0. Меня устраивает однопоточность, как только я нажму на кнопку dispatchOn, но мне нужно воспользоваться преимуществами многоядерности (что, в конце концов, является серьезной причиной для использования FRP). Мои тесты не выявили каких-либо потоков, а печать имени потока не показала никакого разветвления на независимые потоки.

Обратите внимание, что здесь я использую Java 7 вместо 8 (и, черт возьми, убил бы я здесь лямда-функции и их синтаксис).

person Josh P    schedule 31.03.2015