Как сделать несколько вызовов Spring Webclient параллельно и дождаться результата?

Я новичок в реактивном программировании и хотел бы выполнить два вызова API параллельно, обработать результаты и вернуть простой массив или список элементов.

У меня есть две функции: одна возвращает Flux, а другая - Mono, и я создаю очень простую логику фильтрации для элементов, излучаемых Flux, в зависимости от результата этого Mono.

Я пробовал использовать zipWith, но до конца добрался только один элемент, независимо от логики фильтрации. Также я пробовал с block, но это запрещено внутри контроллера: /

@GetMapping("/{id}/offers")
fun viewTaskOffers(
        @PathVariable("id") id: String,
        @AuthenticationPrincipal user: UserPrincipal
) : Flux<ViewOfferDTO> {
    data class TaskOfferPair(
        val task: TaskDTO,
        val offer: ViewOfferDTO
    )

    return client.getTaskOffers(id).map {
            it.toViewOfferDTO()
        }.zipWith(client.getTask(id), BiFunction {
            offer: ViewOfferDTO, task: TaskDTO -> TaskOfferPair(task, offer)
        }).filter {
            it.offer.workerUser.id == user.id || it.task.creatorUser == user.id
        }.map {
            it.offer
        }
}
  • getTaskOffers возвращает поток OfferDTO
  • getTask возвращает моно TaskDTO

Если вы не можете ответить на мой вопрос, скажите мне хотя бы, как выполнять несколько вызовов API параллельно и ждать результатов в WebClient.


person Ahmed H. Saab    schedule 05.06.2019    source источник


Ответы (3)


Вот пример использования параллельного вызова.

public Mono<UserInfo> fetchCarrierUserInfo(User user) {
        Mono<UserInfo> userInfoMono = fetchUserInfo(user.getGuid());
        Mono<CarrierInfo> carrierInfoMono = fetchCarrierInfo(user.getCarrierGuid());

        return Mono.zip(userInfoMono, carrierInfoMono).map(tuple -> {
            UserInfo userInfo = tuple.getT1();
            userInfo.setCarrier(tuple.getT2());
            return userInfo;
        });
    }

Здесь:

  • fetchUserInfo выполняет http-вызов для получения информации о пользователе из другой службы и возвращает Mono
  • fetchCarrierInfo метод выполняет HTTP-вызов для получения информации о носителе от другой службы и возвращает Mono
  • Mono.zip() объединяет данные моно в новое моно, которое будет выполнено, когда все данные моно произведут элемент, объединяя их значения в кортеж2.

Затем вызовите fetchCarrierUserInfo().block(), чтобы получить окончательный результат.

person Sarvar Nishonboyev    schedule 09.12.2019

Как вы уже выяснили, zipWith здесь вам не поможет, так как будет выдавать min(a.size, b.size), которое всегда будет равно 1, если один из них Mono.

Но поскольку эти двое независимы, их можно просто разделить:

val task: Mono<TaskDTO> = client.getTask(id)
val result: Flux<ViewOfferDTO> = 
task.flatMapMany {t ->
        client.getTaskOffers(id).map {offer ->
            t to offer
        }
    }.filter {
        it.second.workerUser.id == user.id || it.first.creatorUser == user.id
    }.map {
        it.second
}

Обратите внимание: если вы хотите иметь пару элементов, вы можете использовать встроенный Pair.

Кроме того, эта проверка не имеет особого смысла, поскольку у вас есть только Mono: it.first.creatorUser

person Alexey Soshin    schedule 05.06.2019
comment
Также getTaskOffers возвращает Flux, поэтому при сопоставлении будет возвращено одно предложение, а не предложения. - person Ahmed H. Saab; 06.06.2019
comment
Помните, у меня нет вашего реального кода, поэтому я не могу быть на 100% точным. Тем не менее, я расширил свой ответ. Надеюсь, это поможет. - person Alexey Soshin; 06.06.2019

Преобразуйте Mono в Flux с помощью repeat ():

client.getTask(id).cache().repeat();

Итак, ваш код станет

    return client.getTaskOffers(id).map {
        it.toViewOfferDTO()
    }.zipWith(client.getTask(id).cache().repeat(), BiFunction {
        offer: ViewOfferDTO, task: TaskDTO -> TaskOfferPair(task, offer)
    }).filter {
        it.offer.workerUser.id == user.id || it.task.creatorUser == user.id
    }.map {
        it.offer
    }
person McGin    schedule 10.12.2019