Как дождаться окончания подписки?

Я хочу сделать асинхронный вызов отдыха, для которого я использую веб-клиент Spring и возвращаю Mono. Я также делаю некоторые вызовы базы данных параллельно, но по какой-то причине это нельзя сделать реактивно.

    Map<String, Object> models = new HashMap<>();

    Mono<User> users = this.webClient...;
    users.map(resp -> new UserState(userRequest, resp))
            .subscribe(response -> {
                models.put("userState", response);
            });
    Iterable<Product> messages = this.productRepository.findAll();
    models.put("products", messages);
    //Wait for users.subscribe to finish <<<<<<<<<<<<<HERE
    return new ModelAndView("messages/list", models);

Как мне дождаться завершения подписки, прежде чем возвращать ModelAndView. Это было бы легко, если бы я использовал Future, где я мог бы делать get(), когда захочу.


person Heisenberg    schedule 03.06.2019    source источник
comment
Разве это не противоречит цели асинхронной обработки? Если вам это нужно на 100%, используйте block().   -  person Paul Benn    schedule 03.06.2019
comment
@PaulBenn Я делаю вызов базы данных после подписки, которая, как я понимаю, выполняется параллельно. Много раз мой вызов REST завершался до того, как возвращался вызов БД. Я хочу обрабатывать случаи, когда это не так.   -  person Heisenberg    schedule 03.06.2019
comment
Вы можете создать Lock раньше и разблокировать его в обратном вызове подписки.   -  person daniu    schedule 04.06.2019
comment
@daniu Думаю, это может сработать. Но Mono<User> возвращается мной, а мои клиенты будут обрабатывать все остальное. Будет немного утомительно заставлять их разбираться с замками. Может быть, я могу отзвониться прямо от них, подписаться сам и вернуть блокировку в ответ напрямую. Но я действительно удивлен, что для этого нет встроенного метода. Думаю, мне придется вернуться к старому доброму Future.   -  person Heisenberg    schedule 04.06.2019


Ответы (1)


Вы можете обернуть блокирующий вызов в Mono, выполняемый в отдельном планировщике, заархивировать его с Mono, содержащим UserState данные, и преобразовать их комбинацию в Mono<ModelAndView> (который может быть возвращен из методов контроллера Spring). Вызовы будут выполняться параллельно, результаты будут объединены, когда оба вызова будут завершены.

Вы можете определить один ограниченный планировщик для каждого приложения специально для блокировки вызовов и предоставить его в качестве аргумента конструктора любому классу, выполняющему блокирующие вызовы.

Код будет выглядеть следующим образом:

@Configuration 
class SchedulersConfig {

  @Bean
  Scheduler parallelScheduler(@Value("${blocking-thread-pool-size}") int threadsCount) {
    return Schedulers.parallel(threadsCount);
  }
}

@RestController
class Controller {

  final Scheduler parallelScheduler;

  ...

  Mono<User> userResponse = // webClient...

  Mono<Iterable<Product>> productsResponse = Mono.fromSupplier(productRepository::findAll)
    .subscribeOn(parallelScheduler); 

  return Mono.zip(userResponse, productsResponse, (user, products) -> 
    new ModelAndView("messages/list", 
      ImmutableMap.of(
        "userState", new UserState(userRequest, user),
        "products", products
      ))
  );
}

Обновление на основе комментария:
Если вам просто нужно асинхронно выполнить вызов HTTP, а затем соединить его с результатами базы данных, вы можете сделать следующее

Map<String, Object> models = new HashMap<>();
Mono<User> userMono = webClient...;
CompletableFuture<User> userFuture = userMono.toFuture();
Iterable<Product> messages = productRepository.findAll();
User user = userFuture.join();
models.put("products", messages);
models.put("userState", new UserState(userRequest, user));
return new ModelAndView("messages/list", models);
person Ilya Zinkovich    schedule 04.06.2019
comment
Спасибо за ваш ответ. Но вызов БД не в моей руке. Я предоставил реактивную поддержку в своем SDK, и он вернет Mono‹User›, и клиенты будут использовать его с этого момента. Ваш ответ был моим первым предложением, но он слишком сложен для клиентов, поскольку в их коде происходит гораздо больше вещей. Они не хотят иметь дело с реактивным беспорядком. Им нужна функциональность типа Future. - person Heisenberg; 04.06.2019
comment
@Heisenberg, тогда просто превратите Mono<User> в CompletableFuture<User>, используя метод .toFuture() вместо использования .subscribe() и .join() в будущем после вызова БД. - person Ilya Zinkovich; 04.06.2019
comment
@IIya, это выглядит великолепно! Однако один вопрос: поскольку Mono<User> еще не подписан, как вы думаете, toFuture() начнет звонить? - person Heisenberg; 06.06.2019
comment
@Heisenberg да, toFuture() звонит .subscribe() внутри компании. - person Ilya Zinkovich; 06.06.2019