Параллельный запрос GET для конкретного сопоставления с WebFlux

Я хочу вызвать независимый запрос одновременно с WebClient. Моя предыдущая попытка с RestTemplate блокировала мои потоки в ожидании ответа. Итак, я понял, что WebClient с ParallelFlux может использовать один поток более эффективно, потому что он должен планировать несколько запросов с одним потоком.

Моя конечная точка запрашивает тупель id и location.

Метод fooFlux будет вызываться несколько тысяч раз в цикле с разными параметрами. Возвращенная карта будет утверждена относительно сохраненных опорных значений.

Предыдущие попытки использования этого кода приводили к дублированию вызовов API. Но недостаток все же есть. Размер набора ключей mapping часто меньше размера Set<String> location. Фактически, размер получившейся карты меняется. Кроме того, время от времени это правильно. Таким образом, может возникнуть проблема с завершением индекса после того, как метод вернул карту.

public Map<String, ServiceDescription> fooFlux(String id, Set<String> locations) {
    Map<String, ServiceDescription> mapping = new HashMap<>();
    Flux.fromIterable(locations).parallel().runOn(Schedulers.boundedElastic()).flatMap(location -> {
        Mono<ServiceDescription> sdMono = getServiceDescription(id, location);
        Mono<Mono<ServiceDescription>> sdMonoMono = sdMono.flatMap(item -> {
            mapping.put(location, item);
            return Mono.just(sdMono);
        });
        return sdMonoMono;
    }).then().block();
    LOGGER.debug("Input Location size: {}", locations.size());
    LOGGER.debug("Output Location in map: {}", mapping.keySet().size());
    return mapping;
}

Обработка Get-Request

private Mono<ServiceDescription> getServiceDescription(String id, String location) {
    String uri = URL_BASE.concat(location).concat("/detail?q=").concat(id);
    Mono<ServiceDescription> serviceDescription =
                    webClient.get().uri(uri).retrieve().onStatus(HttpStatus::isError, clientResponse -> {
                        LOGGER.error("Error while calling endpoint {} with status code {}", uri,
                                        clientResponse.statusCode());
                        throw new RuntimeException("Error while calling Endpoint");
                    }).bodyToMono(ServiceDescription.class).retryBackoff(5, Duration.ofSeconds(15));
    return serviceDescription;
}

person froehli    schedule 29.02.2020    source источник
comment
почему вы используете JsonNode.class, а не сериализуете / десериализуете в конкретный объект? и зачем использовать реактивное программирование для того, что можно решить с помощью @Async. Реактивное программирование - это не асинхронное программирование. Это две разные вещи, которые дополняют друг друга.   -  person Toerktumlare    schedule 29.02.2020
comment
Я прочитал ваш ответ от 5 августа 1919 года о RestTemplate и WebClient. Я больше нигде не использую реактивное программирование. Но моя конечная точка имеет балансировщик нагрузки, и я могу создавать модули, чтобы получить необходимые потоки на моем сервере. Вся суть моего подхода заключается в том, чтобы время от времени проверять данные на бэкэнде.   -  person froehli    schedule 29.02.2020
comment
RestClients не влияют на скорость загрузки. Пропускная способность сети влияет на скорость. Итак, какой клиент вы используете, не повлияет на скорость загрузки. И если вам нужен только маленький кусочек, кто сказал, что вам нужно объявить объект целиком? просто создайте класс с нужным вам кусочком. Да, используйте WebClient, но вам нужно знать разницу между реактивным программированием и параллельным программированием.   -  person Toerktumlare    schedule 29.02.2020
comment
Реактивное программирование - это отсутствие блокировок и максимальное использование потоков для выполнения как последовательных, так и параллельных задач. В то время как параллельное программирование должно делать то, что вы хотите, одновременно порождать потоки и получать данные. Реактивное программирование может выполнять действия последовательно и одновременно для решения поставленной вами задачи. Но он не используется преимущественно для выполнения асинхронных задач.   -  person Toerktumlare    schedule 29.02.2020
comment
Я постараюсь объявить подмножество модели. Никогда об этом не думал. Спасибо! Я перефразировал свой вопрос в исходном посте. Вам не кажется, что этот подход может улучшить производительность для получения всей информации, поскольку мои потоки не блокируются во время ожидания сетевого ввода-вывода? Не могли бы вы взглянуть на мои два звонка block(). Думаю, первого можно было бы сэкономить.   -  person froehli    schedule 29.02.2020
comment
хорошо, если вам нужно собрать все результаты и получить конкретные значения для возврата вызывающему клиенту в одном большом двоичном объекте, а не передавать результаты вызывающему клиенту, тогда в вашем приложении, которое не является реактивным, необходимо использовать блок. Но как вы это сделали, планировщик поставил на собственный. Но я бы предложил использовать планировщик boundedElastic, тот, который вы выбрали, будет использовать потоки до бесконечности, а в худшем случае произойдет нехватка потоков и выйдет из строя приложение.   -  person Toerktumlare    schedule 29.02.2020
comment
Отлично, я сразу займусь этим. Итак, изменив планировщик и сериализацию json, я готов. Спасибо за вашу помощь! И подумайте, что мне нужен весь блог, потому что я проверяю его в JUnit-Test по значениям из другого источника.   -  person froehli    schedule 29.02.2020
comment
Мне все еще интересно, что я блокирую Mono в своей функции flatMap, помещаю его на карту и все равно возвращаю тот же Mono. Смотрите счетчик интуитивно понятный.   -  person froehli    schedule 29.02.2020


Ответы (2)


public Map<String, ServiceDescription> fooFlux(String id, Set<String> locations) {
    return Flux.fromIterable(locations)
               .flatMap(location -> getServiceDescription(id, location).map(sd -> Tuples.of(location, sd)))
               .collectMap(Tuple2::getT1, Tuple2::getT2)
               .block();
}

Примечание: оператор flatMap в сочетании с вызовом WebClient дает вам одновременное выполнение, поэтому нет необходимости использовать ParallelFlux или какой-либо Scheduler.

person Martin Tarjányi    schedule 01.03.2020
comment
Спасибо, Мартин, я думаю, в этом все дело. Я не мог найти или исправить использование метода collectMap в этом контексте. Не могли бы вы пояснить, почему бы вам не использовать ParallelFlux с Scheduler? Я думал, что пул общих потоков рабочих модулей ускорится в ситуациях ввода-вывода. - person froehli; 02.03.2020
comment
Насколько я понимаю, ParallelFlux предназначен для интенсивной работы процессора. Параллельный ввод-вывод может быть достигнут без него, так как во время ожидания внешнего ресурса ЦП не используется, поэтому высокий объем параллелизма может быть достигнут с очень низким числом потоков. ParallelFlux просто усложняйте вещи, чем нужно. - person Martin Tarjányi; 02.03.2020

Реактивный код запускается, когда вы подписываетесь на производителя. Блок подписывается, и поскольку вы вызываете блок дважды (один раз в Mono, но снова возвращаете Mono, а затем вызываете блок в ParallelFlux), Mono выполняется дважды.

    List<String> resultList = listMono.block();
    mapping.put(location, resultList);
    return listMono;

Вместо этого попробуйте что-нибудь вроде следующего (не проверено):

    listMono.map(resultList -> {
       mapping.put(location, resultList);
       return Mono.just(listMono);
    });

Тем не менее, модель реактивного программирования довольно сложна, поэтому рассмотрите возможность работы с @Async и _4 _ / _ 5_, если речь идет только о параллельном вызове удаленного вызова, как предлагали другие. Вы все еще можете использовать WebClient (кажется, что RestTemplate устареет), но просто вызывайте блок сразу после bodyToMono.

person Puce    schedule 29.02.2020
comment
Спасибо за ваш ответ. Это помогло мне избавиться от повторяющихся запросов. Чтобы предотвратить пустую карту, мне пришлось привязать Mono.just(listMono) vlaue к переменной, которую мне нужно вернуть в конце внешнего flatMap. Сначала я попытался вернуть listMono, не создавая новый Mono с Mono.just(listMono). Это также привело к дублированию запросов. Не могли бы вы объяснить такое поведение? - person froehli; 01.03.2020
comment
К сожалению, заполненная карта не заполняется полностью. Размер набора ключей mapping часто меньше размера Set<String> location. Фактически, размер получившейся карты меняется. Нужна ли мне внутренняя подписка или что-то еще, чтобы карта была полностью заполнена до возврата из метода? Я обновил свой вопрос, чтобы отразить изменения, которые я внес во время ответов здесь. - person froehli; 01.03.2020