Mono/Flux, опубликованный в ExecutorService, не завершается должным образом

Я немного протестировал предложение в этой ветке: служба-исполнитель

и я немного упростил пример, чтобы его было легче понять. Итак, вот пример:

    ExecutorService executorService = Executors.newSingleThreadExecutor();
    Flux.just("1", "2", "3").subscribeOn(Schedulers.fromExecutorService(executorService)).doOnNext(System.out::println).subscribe();

    try {
        executorService.awaitTermination(10, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } finally {
        executorService.shutdownNow();
    }

Но теперь, если это выполняется, он всегда ждет 10 секунд до завершения основного потока. Чего я ожидал? Я ожидал, что служба-исполнитель будет ждать не более 10 секунд, пока не продолжит работу и не вызовет завершение работы. Обычно это должно быть сделано за несколько миллисекунд и сразу же после печати 1, 2, 3. javadoc говорит здесь:

Блокируется до тех пор, пока все задачи не завершат выполнение после запроса на завершение работы, или не истечет время ожидания, или текущий поток не будет прерван, в зависимости от того, что произойдет раньше.

Я не понимаю. Что здесь не так?

Другой пример, который запускается и сразу же завершается (но, на мой взгляд, неправильный):

        ExecutorService executorService = Executors.newSingleThreadExecutor();
    Flux.range(1, 1_000_000).subscribeOn(Schedulers.fromExecutorService(executorService)).doOnNext(System.out::println).subscribe();
    executorService.shutdownNow();

НО, здесь я ожидал, что основной поток не будет ждать завершения потока (соответственно службы исполнителя). Но это так. Насколько я понимаю, два примера ведут себя полностью вверх ногами в соответствии с описанием javadoc. javadoc говорит:

Этот метод не ожидает завершения активно выполняющихся задач. Для этого используйте awaitTermination.

Любые идеи?

С уважением Бернадо


person Bernado    schedule 11.11.2019    source источник
comment
Перед awaitTermination нужно позвонить shutdown. Вы не вызываете shutdown в своем коде. Так что не shutdownNow, а shutdown! Если вы не вызовете это, он будет ждать тайм-аута через 10 секунд.   -  person M. Deinum    schedule 11.11.2019
comment
О, да. Спасибо, это решение для части кода 1. Но что с примером 2? Почему он ждет завершения, хотя я не добавил executorService.awaitTermination(...);   -  person Bernado    schedule 11.11.2019
comment
Он не ждет, он просто завершается и не дает никаких гарантий. Однако, по-видимому, вы выполняете ту небольшую работу, которую он успевает закончить до выключения или во время выключения,   -  person M. Deinum    schedule 11.11.2019
comment
Ну... думаю, я понял. Нет гарантии, что основной поток будет ждать завершения, верно? Даже порядок shutdown -> await не гарантирует, что он будет действовать как thread.join?   -  person Bernado    schedule 11.11.2019
comment
Это не означает, что он выключен, awaitTermination возвращает boolean, что в случае false означает, что он еще не выключился. Вот почему вы часто видите awaitTermination в цикле while. while(!awaitTermination(1, SECONDS) {}. Который будет зацикливаться до тех пор, пока он действительно не завершится, а затем очистить ресурсы.   -  person M. Deinum    schedule 11.11.2019
comment
Извините... но еще раз... код flux.subscribe -> shutdown -> awaittermination семантически не равен thread.join, верно? то, что я изначально хотел сделать, это: создать поток, опубликовать его в новом потоке и дождаться завершения в основном потоке. Например, следующий код завершается немедленно и оценивает значение awaittermination как true: Flux.range(1, 3).publishOn(Schedulers.fromExecutorService(executorService)).delayElements(Duration.ofSeconds(3)) .subscribe(); executorService.shutdown(); System.out.println(executorService.awaitTermination(20, TimeUnit.SECONDS));   -  person Bernado    schedule 11.11.2019
comment
Это не равно thread.join. Отключение приведет к отключению, если задачи будут завершены. awaitTermination увидит это и закончит, как только все будет сделано. Если у вас нет длительной задачи, занимающей до 30 секунд, она вернет false. Я настоятельно рекомендую вам прочитать javadocs ExecutorService.   -  person M. Deinum    schedule 11.11.2019
comment
Хорошо, я прочитаю javadoc... ;-) Еще раз извините, но даже если у меня ЕСТЬ длительная задача, выключение завершает ее, даже если есть вызов awaittermination...   -  person Bernado    schedule 11.11.2019
comment
Если это не цикл while, который не заканчивается до тех пор, пока он действительно не завершится, он просто подождет x раз, а затем уничтожит программу.   -  person M. Deinum    schedule 12.11.2019
comment
Извините, но похоже, что это не так. Пожалуйста, попробуйте мой пример четырьмя комментариями выше. Он немедленно завершается, хотя есть ожидание...   -  person Bernado    schedule 12.11.2019
comment
Который. shutdownNow не будет ждать. shutdown будет ждать, если вы используете awaitTermination, иначе просто остановит программу.   -  person M. Deinum    schedule 12.11.2019
comment
Есть ожидание. Итак, в чем идея вызова завершения работы shutdown/await, если это бесполезно или ожидание не имеет никакого эффекта? Пример в комментарии, начинающийся с «извините»... 18 часов назад…   -  person Bernado    schedule 12.11.2019
comment
Он будет ждать 10 секунд или, если задача завершена до этого, немедленно завершится. Если это займет больше 10 секунд, ваш образец не остановится, потому что нет цикла while.   -  person M. Deinum    schedule 12.11.2019
comment
Думаю, это обсуждение ни к чему не приведет. ;) есть ожидание 20 сек, а задача должна сделать за 9 (3*3), но тут же останавливается. Пожалуйста, попробуйте...   -  person Bernado    schedule 12.11.2019
comment
Нет образца, который делает это за 9 секунд, нет ограничений в вашем потоке обработки. Просто распечатать несколько цифр.   -  person M. Deinum    schedule 12.11.2019
comment
Flux.range(1, 3).publishOn(Schedulers.fromExecutorService(executorService)).delayElements(Duration.ofSeconds(3)) .subscribe(); executorService.shutdown(); System.out.println(executorService.awaitTermination(20, TimeUnit.SECONDS));   -  person Bernado    schedule 12.11.2019
comment
delay переместит его в другой планировщик, и с точки зрения однопоточного исполнителя он завершится, потому что все переместилось в другой поток.   -  person M. Deinum    schedule 12.11.2019
comment
ааа, теперь я/мы поняли....спасибо! ;-) если я делаю thread.sleep, например. в карте (), так что это тот же поток, который работает, как ожидалось   -  person Bernado    schedule 12.11.2019