Я неправильно использую rxJava, преобразовывая наблюдаемое в наблюдаемое блокирование?

Мой API делает около 100 нисходящих вызовов попарно к двум отдельным службам. Все ответы должны быть объединены, прежде чем я смогу вернуть свой ответ клиенту. Я использую hystrix-feign для выполнения HTTP-вызовов.

Я придумал то, что считал элегантным решением, пока не нашел документы по rxJava я нашел следующее

BlockingObservable — это разновидность Observable, предоставляющая блокирующие операторы. Это может быть полезно для тестирования и демонстрационных целей, но, как правило, не подходит для производственных приложений (если вы считаете, что вам нужно использовать BlockingObservable, это обычно является признаком того, что вам следует пересмотреть свой дизайн).

Мой код выглядит примерно так

List<Observable<C>> observables = new ArrayList<>();
for (RequestPair request : requests) {
    Observable<C> zipped = Observable.zip(
         feignClientA.sendRequest(request.A()),
         feignClientB.sendRequest(request.B()),
         (a, b) -> new C(a,b));
    observables.add(zipped);
}

Collection<D> apiResponse = = new ConcurrentLinkedQueue<>();

Observable
    .merge(observables)
    .toBlocking()
    .forEach(combinedResponse -> apiResponse.add(doSomeWork(combinedResponse)));

return apiResponse;

Несколько вопросов, основанных на этой настройке:

  1. Является ли toBlocking() оправданным, учитывая мой вариант использования
  2. Правильно ли я понимаю, что фактические HTTP-вызовы не выполняются до тех пор, пока основной поток не доберется до forEach()?
  3. Я видел, что код в блоке forEach() выполняется разными потоками, но мне не удалось проверить, может ли в блоке forEach() быть более одного потока. Выполняется ли там одновременно?

person Lukasz Krawiec    schedule 23.11.2016    source источник


Ответы (1)


Лучшим вариантом является возврат Observable для использования другими операторами, но вам может сойти с рук блокирующий код (однако он должен выполняться в фоновом потоке).

public Observable<D> getAll(Iterable<RequestPair> requests) {
    return Observable.from(requests)
    .flatMap(request ->
        Observable.zip(
            feignClientA.sendRequest(request.A()),
            feignClientB.sendRequest(request.B()),
            (a, b) -> new C(a,b)
        )
    , 8)  // maximum concurrent HTTP requests
    .map(both -> doSomeWork(both));
}

// for legacy users of the API
public Collection<D> getAllBlocking(Iterable<RequestPair> requests) {
    return getAll(requests)
        .toList()
        .toBlocking()
        .first();
}

Правильно ли я понимаю, что фактические HTTP-вызовы не выполняются до тех пор, пока основной поток не доберется до forEach()?

Да, forEach запускает всю последовательность операций.

Я видел, что код в блоке forEach() выполняется разными потоками, но мне не удалось проверить, может ли в блоке forEach() быть более одного потока. Выполняется ли там одновременно?

Только одному потоку разрешено выполнять лямбда-выражение в forEach, но вы действительно можете видеть, что туда входят разные потоки.

person akarnokd    schedule 23.11.2016