Понимание потоков данных и нескольких подписчиков (с использованием модернизации)

Скажем, у меня есть 2 наблюдаемых (A и B), которые по сути являются сетевыми вызовами (с использованием Retrofit для предоставления контекста).

Текущий поток приложения выглядит следующим образом:

  • A и B запускаются примерно в одно и то же время (асинхронно).
  • B выполняется 0 или более раз при взаимодействии с пользователем

У меня есть 3 разных сценария, которые я хочу прослушать, учитывая эти 2 вызова observables/api.

  1. Я хочу знать сразу, когда Observable A завершится
  2. Я хочу знать немедленно, когда Observable B завершится
  3. Я хочу знать, когда оба завершили

Во-первых, это хороший вариант использования RxJava?

Я знаю, как выполнять каждый сценарий по отдельности (используя zip для последнего), хотя я не знаю, как выполнять их все одновременно.

Если я подпишусь на Observable A, начнется A. Если я подпишусь на B, начнется B. Если A и B завершатся до того, как я подпишусь на zip(a, b), я могу пропустить событие и никогда не увидеть его завершенным, верно?

Любое общее руководство будет оценено. Мои знания RxJava довольно скудны: P


person loeschg    schedule 09.12.2014    source источник


Ответы (1)


Вы можете добиться этого, используя три разных наблюдаемых, по одному для каждого вашего случая.

Поскольку вам придется обмениваться состояниями между каждыми наблюдаемыми, вам придется преобразовать холодные наблюдаемые объекты в горячие наблюдаемые. (дополнительную информацию по этой теме см. здесь)

ConnectableObservable a = service.callA().publish(); 
ConnectableObservable b = service.callB().publish();

a.subscribe((e) -> { /* onNext */ }, (ex) -> {/* onError */},  () -> {/* when A is completed */ });
b.subscribe((e) -> { /* onNext */ }, (ex) -> {/* onError */},  () -> {/* when B is completed */ });
a.mergeWith(b).subscribe((e) -> { /* onNext */ }, (ex) -> {/* onError */},  () -> {/* when A and B are completed */ });

a.connect(); // start subscription to a
b.connect(); // start subscription to b

Не делитесь объектом между методами onCompleted, иначе вам придется иметь дело с проблемами параллелизма.

person dwursteisen    schedule 10.12.2014
comment
Круто, это было действительно полезно. Не могли бы вы немного рассказать о проблемах параллелизма, с которыми я могу столкнуться? Если я наблюдаю за всеми ими в одном потоке, это все еще вызывает беспокойство? Я думаю, что либо A, либо B завершатся первыми, а затем A+B. - person loeschg; 10.12.2014
comment
Observable a, Observable b и Observable ab (объединенные наблюдаемые из a и b) могут выполняться в разных потоках. Таким образом, вызов методов onCompleted не может быть последовательным (в отличие от контракта rx). Вы можете прочитать общую переменную с именем hello в методе onCompleted. В следующей строке вы снова читаете ту же переменную. Значение может измениться, так как другой метод onCompleded из другого наблюдаемого может изменить его. - person dwursteisen; 11.12.2014