Я хочу вызвать независимый запрос одновременно с WebClient
. Моя предыдущая попытка с RestTemplate
блокировала мои потоки в ожидании ответа. Итак, я понял, что WebClient
с ParallelFlux
может использовать один поток более эффективно, потому что он должен планировать несколько запросов с одним потоком.
Моя конечная точка запрашивает тупель id
и location
.
Метод fooFlux
будет вызываться несколько тысяч раз в цикле с разными параметрами. Возвращенная карта будет утверждена относительно сохраненных опорных значений.
Предыдущие попытки использования этого кода приводили к дублированию вызовов API. Но недостаток все же есть. Размер набора ключей mapping
часто меньше размера Set<String> location
. Фактически, размер получившейся карты меняется. Кроме того, время от времени это правильно. Таким образом, может возникнуть проблема с завершением индекса после того, как метод вернул карту.
public Map<String, ServiceDescription> fooFlux(String id, Set<String> locations) {
Map<String, ServiceDescription> mapping = new HashMap<>();
Flux.fromIterable(locations).parallel().runOn(Schedulers.boundedElastic()).flatMap(location -> {
Mono<ServiceDescription> sdMono = getServiceDescription(id, location);
Mono<Mono<ServiceDescription>> sdMonoMono = sdMono.flatMap(item -> {
mapping.put(location, item);
return Mono.just(sdMono);
});
return sdMonoMono;
}).then().block();
LOGGER.debug("Input Location size: {}", locations.size());
LOGGER.debug("Output Location in map: {}", mapping.keySet().size());
return mapping;
}
Обработка Get-Request
private Mono<ServiceDescription> getServiceDescription(String id, String location) {
String uri = URL_BASE.concat(location).concat("/detail?q=").concat(id);
Mono<ServiceDescription> serviceDescription =
webClient.get().uri(uri).retrieve().onStatus(HttpStatus::isError, clientResponse -> {
LOGGER.error("Error while calling endpoint {} with status code {}", uri,
clientResponse.statusCode());
throw new RuntimeException("Error while calling Endpoint");
}).bodyToMono(ServiceDescription.class).retryBackoff(5, Duration.ofSeconds(15));
return serviceDescription;
}
JsonNode.class
, а не сериализуете / десериализуете в конкретный объект? и зачем использовать реактивное программирование для того, что можно решить с помощью@Async
. Реактивное программирование - это не асинхронное программирование. Это две разные вещи, которые дополняют друг друга. - person Toerktumlare   schedule 29.02.2020JsonNode.class
, потому что полученная модель JSON огромна, и мне просто нужно ее немного. Я придумал реактивное программирование из-за статьи о baeldung (baeldung.com/spring-webclient-resttemplate < / а>). Я хочу заархивировать прирост скорости загрузки. ПодходRestTemplate
в ParallelStreams обеспечил мне скорость загрузки от 10 Мбит / с до 200 Мбит / с на мою сетевую карту. В зависимости от количества локаций на id. Но это варьируется от 1 до ~ 4000 - person froehli   schedule 29.02.2020block()
. Думаю, первого можно было бы сэкономить. - person froehli   schedule 29.02.2020Mono
в своей функцииflatMap
, помещаю его на карту и все равно возвращаю тот жеMono
. Смотрите счетчик интуитивно понятный. - person froehli   schedule 29.02.2020