Schdulers.elastic не создает новые потоки в Reactor

Я пытаюсь создать поток, в котором поток испускает 10 элементов, каждый параллельно, причем каждый элемент спит в течение 1 секунды. Поскольку каждый элемент публикуется в отдельном потоке, я ожидаю, что весь процесс займет 1 с. Но журналы показывают, что вместо этого требуется 10 секунд.

Я попытался изменить subscribeOn на publishOn, map на doOnNext. Но ни один из них, похоже, не работает.

Я новичок в Reactor и пытаюсь понять, в чем я ошибаюсь. Любая помощь будет очень признательна. Спасибо

    public void whenIsANewThreadCreated() {
        Flux.range(1,10)
                .publishOn(Schedulers.elastic())
                .map(count -> {
                    logger.info(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
                    try {
                        Thread.sleep(1_000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return count;
                })
        .blockLast();
    }
2020-03-30 16:17:29.799  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 1
2020-03-30 16:17:30.802  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 2
2020-03-30 16:17:31.804  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 3
2020-03-30 16:17:32.805  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 4
2020-03-30 16:17:33.806  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 5
2020-03-30 16:17:34.808  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 6
2020-03-30 16:17:35.809  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 7
2020-03-30 16:17:36.811  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 8
2020-03-30 16:17:37.812  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 9
2020-03-30 16:17:38.814  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 10




Ответы (2)


Вы должны сначала создать параллельный поток, вызвав метод parallel, и вы должны использовать runOn для достижения параллелизма.

Flux.range(1,10)
    .parallel()
    .runOn(Schedulers.elastic())
    .map(count -> {
        System.out.println(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
        try {
            Thread.sleep(1_000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return count;
    }).subscribe();

  • Используйте Schedulers.boundedElastic(), поскольку использование Scheduler.elastic() не рекомендуется
  • parallel по умолчанию создает потоки на основе ядра вашего процессора. Если вы хотите, чтобы больше потоков использовали parallel(10) - я думаю, это то, что вы хотите увидеть.
person vins    schedule 30.03.2020
comment
Спасибо. Это работает. Но я все еще не понимаю, почему описанный выше метод не работает. Кроме того, в parallel () меня ограничивает количество ядер процессора, которое у меня есть. В то время как Schedulers.elastic () позволяет мне неограниченное количество потоков. Кроме того, мой вариант использования не позволяет мне использовать параллель. Извините, я знаю, что это слишком много. Если у вас есть ссылки, на которые я могу сослаться, это тоже сработает. - person rgbk21; 31.03.2020
comment
Проверьте этот projectreactor.io/docs/core/release/reference - раздел 9.5. - person vins; 31.03.2020

Спецификация требует, чтобы onNext события вызывались последовательно. Ваш map эффективно превращает входные onNext события в onNext события, которые блокируются на 1 секунду. Согласно спецификации, 10 входящих onNext приводят к серии из 10 исходящих onNext, каждый из которых блокирует 1 с => 10 с.

Вы абсолютно на 100% ДОЛЖНЫ использовать parallel(10).runOn(Scheduler.elastic()), если хотите распределить эту блокирующую рабочую нагрузку по 10 параллельным рельсам. (Scheduler для runOn также может быть Schedulers.boundedElastic() или Schedulers.newParallel(10)).

person Simon Baslé    schedule 02.04.2020