Как я могу асинхронно вызывать несколько наблюдаемых вызовов, но синхронно выполнять некоторые вычисления до и после этих вызовов?

Я новичок в RxJava. У меня есть несколько клиентов Jersey RxJava, которые возвращают Observables. Мне нужно сделать один вызов, чтобы получить некоторые данные, эти данные станут входными данными для моих следующих 3 вызовов. Я хочу, чтобы эти звонки были сделаны параллельно. Наконец, я хочу выполнить расчет после завершения всех вызовов, для которого требуются все данные. Вот как это выглядит:

interface Service {
  Observable<ResultA> callServiceA(InitialData input);
  Observable<ResultB> callServiceB(ResultA resultA);
  Observable<ResultC> callServiceC(ResultA resultA);
  Observable<ResultD> callServiceD(ResultA resultA);
  FinalResult simpleCalculation(ResultA a, ResultB b, ResultC c, ResultD d);
}

class MyClass{

   @Autowired
   ExecutorService myExecutorService;

   Observable<FinalResult> myMethod(InitialData initialData){
   /* Make call to ServiceA, get the results, then make calls to services B, C, and D in parallel (on different threads), finally perform simpleCalculation, and emit the result */
   }
}

person Adam    schedule 27.07.2016    source источник


Ответы (2)


flatMap() и zip() ваши друзья в этой ситуации.

Observable<FinalResult> myMethod(InitialData initialData) {
    return service
            .callServiceA(initialData)
            .flatMap(resultA -> Observable.zip(
                    service.callServiceB(resultA),
                    service.callServiceC(resultA),
                    service.callServiceD(resultA),
                    (resultB, resultC, resultD) -> 
                      service.simpleCalculation(resultA, resultB, resultC, resultD))
            );
}

Использование return observable будет выглядеть так:

Subscription subscription =
        myMethod(new InitialData())
                .subscribe(finalResult -> {
                            // FinalResult will end up here.
                        },
                        throwable -> {
                            // Handle all errors here.
                        });
person kjones    schedule 28.07.2016
comment
Спасибо! Как я могу заставить его вызывать службы B, C и D параллельно в разных потоках? Мне не нужно добавлять планировщик? - person Adam; 28.07.2016
comment
Так много людей используют Retrofit для своих сетевых вызовов, которые автоматически планируют вызовы параллельно. Если ваша служба выполняет их синхронно, добавьте .subscribeOn(Schedulers.io()) к каждому вызову. - person kjones; 28.07.2016
comment
Я не использую Retrofit. Я использую клиент Джерси. Раньше я добавлял .observeOn(Schedulers.from(executorService)) в конец цепочки. Чем это отличается от .subscribeOn? - person Adam; 28.07.2016
comment
Я не совсем понимаю последствия использования Jersey Client. Вот их ссылка на асинхронный Rx jersey.java .net/documentation/latest/ - person kjones; 28.07.2016
comment
Когда вы подписываетесь на Observable, он будет выполняться в текущем потоке, если планировщик .subscribeOn() уже не добавлен к Observable. Если ваш Observable является синхронным, использование только .observeOn(Schedulers.from(executorService)) выполнит сетевой запрос в текущем потоке, а затем отправит результат в executorService. Использование .subscribeOn(Schedulers.from(executorService)) освободит текущий поток при подписке и инициирует запрос с помощью executorService. - person kjones; 28.07.2016
comment
Будем надеяться, что клиент Джерси позволяет каким-то образом настроить асинхронное выполнение всех вызовов. В худшем случае, если по какой-то причине все наблюдаемые клиента Джерси являются синхронными, было бы добавить слой над клиентом трикотажа, который преобразует каждый из их синхронных наблюдаемых объектов в асинхронные наблюдаемые объекты. - person kjones; 28.07.2016

Вы можете использовать flatMap для выполнения синхронного вызова, затем использовать zip или слияние для отправки нескольких вызовов, а затем снова использовать flatMap после завершения.

person Graeme    schedule 27.07.2016