Только начинаю изучать проект реактора и его абстракции Mono и Flux, и хотел бы понять основные различия с базовыми модулями Java 8 CompletableFuture.
Вот простой код, который у меня есть:
public static void main(String[] args) throws Exception {
Mono.fromCallable(() -> getData())
.map(s -> s + " World ")
.subscribe(s -> System.out.println(s));
CompletableFuture.supplyAsync(() -> getData())
.thenAccept(System.out::println);
System.out.println(Thread.currentThread()+" End ");
}
private static String getData() {
int j=0;
for(int i=0; i<Integer.MAX_VALUE; i++){
j = j - i%2;
}
System.out.println(Thread.currentThread()+" - "+j);
return " Hello ";
}
Во-первых, с CompletableFuture
никаких сюрпризов. supplyAsync
планирует выполнение функции через ForkJoinPool, и строка «End» распечатывается немедленно, и программа завершается, поскольку основной поток здесь действительно недолговечен - как и ожидалось.
Но Mono.fromCallable(...)
блокирует там основной поток. Кроме того, имя потока, которое печатается в функции getData()
, является основным потоком. Поэтому я вижу последовательное / блокирующее поведение, а не последовательное / неблокирующее (асинхронное) поведение. Это блокируется из-за того, что я применил функцию подписки в том же потоке? Кто-нибудь может объяснить это, пожалуйста?