Reactor Mono против CompletableFuture

Только начинаю изучать проект реактора и его абстракции Mono и Flux, и хотел бы понять основные различия с базовыми модулями Java 8 CompletableFuture.

Вот простой код, который у меня есть:

public static void main(String[] args) throws Exception {

    Mono.fromCallable(() -> getData())
            .map(s -> s + " World ")
            .subscribe(s -> System.out.println(s));

    CompletableFuture.supplyAsync(() -> getData())
            .thenAccept(System.out::println);

    System.out.println(Thread.currentThread()+" End ");
}

private static String getData() {

    int j=0;

    for(int i=0; i<Integer.MAX_VALUE; i++){
        j = j - i%2;
    }

    System.out.println(Thread.currentThread()+" - "+j);
    return " Hello ";
}

Во-первых, с CompletableFuture никаких сюрпризов. supplyAsync планирует выполнение функции через ForkJoinPool, и строка «End» распечатывается немедленно, и программа завершается, поскольку основной поток здесь действительно недолговечен - как и ожидалось.

Но Mono.fromCallable(...) блокирует там основной поток. Кроме того, имя потока, которое печатается в функции getData(), является основным потоком. Поэтому я вижу последовательное / блокирующее поведение, а не последовательное / неблокирующее (асинхронное) поведение. Это блокируется из-за того, что я применил функцию подписки в том же потоке? Кто-нибудь может объяснить это, пожалуйста?


person user1189332    schedule 30.11.2017    source источник


Ответы (1)


Это блокируется из-за того, что я применил функцию подписки в том же потоке?

Кажется, именно это и происходит.

Это специфическое поведение меня немного удивляет, поскольку это не то, как ведет себя большинство конвейеров. В большинстве конвейеров так или иначе есть операции, которые делают конвейер асинхронным. publishOn, subscribeOn - очевидные примеры, но также _ 3_ может иметь такой эффект и, возможно, многие другие. В таких случаях подписка вернется немедленно.

Это намекает на очень важный момент относительно реактивного программирования: конвейеры не должны содержать длинных блокирующих вызовов. Реактивный конвейер предназначен для подготовки и подписки на события процесса без блокировки. Таким образом, блокирующие операторы могут полностью заблокировать выполнение. Используя Scheduler, вы можете ограничить такие вызовы специальными пулами потоков и, таким образом, контролировать их действие.

person Jens Schauder    schedule 01.12.2017