Задний план
У меня есть несколько RxJava Observables (либо сгенерированных из клиентов Джерси, либо заглушек с использованием Observable.just(someObject)
). Все они должны выдавать ровно одно значение. У меня есть компонентный тест, который имитирует всех клиентов Джерси и использует Observable.just(someObject)
, и я вижу там то же поведение, что и при запуске производственного кода.
У меня есть несколько классов, которые действуют на эти наблюдаемые, выполняют некоторые вычисления (и некоторые побочные эффекты - я мог бы сделать их прямыми возвращаемыми значениями позже) и возвращать пустые наблюдаемые пустоты.
В какой-то момент в одном таком классе я пытаюсь заархивировать несколько исходных наблюдаемых объектов, а затем сопоставить их - что-то вроде следующего:
public Observable<Void> doCalculation() {
return Observable.zip(
getObservable1(),
getObservable2(),
getObservable3(),
UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
}
// in Unifying Object
public Observable<Void> processToNewObservable() {
// ... do some calculation ...
return Observable.empty();
}
Затем все вычисляющие классы объединяются и ожидаются:
// Wait for rule computations to complete
List<Observable<Void>> calculations = ...;
Observable.zip(calculations, results -> results)
.toBlocking().lastOrDefault(null);
Проблема
Проблема в том, что processToNewObservable()
никогда не выполняется. По процессу исключения я вижу, что это getObservable1()
, в этом проблема - если я заменю его на Observable.just(null)
, все будет выполнено так, как я себе представлял (но с нулевым значением, где мне нужно настоящее).
Повторюсь, getObservable1()
возвращает Observable от клиента Jersey в производственном коде, но этот клиент является имитацией Mockito, возвращающей Observable.just(someValue)
в моем тесте.
Расследование
Если я конвертирую getObservable1()
в блокировку, а затем оборачиваю первое значение в just()
, снова все выполняется так, как я себе представлял (но я не хочу вводить этап блокировки):
Observable.zip(
Observable.just(getObservable1().toBlocking().first()),
getObservable2(),
getObservable3(),
UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
Моя первая мысль заключалась в том, что, возможно, что-то еще потребляло значение, испускаемое из моего наблюдаемого, и zip
видел, что оно уже было завершено, таким образом определяя, что результатом их сжатия должна быть пустая наблюдаемая. Я пробовал добавлять .cache()
к каждому наблюдаемому источнику, который, как мне кажется, связан с этим, но это не повлияло на поведение.
Я также пробовал добавить обработчики next / error / complete / finally в getObservable1 (без преобразования его в блокировку) непосредственно перед zip, но ни один из них тоже не выполнил:
getObservable1()
.doOnNext(...)
.doOnCompleted(...)
.doOnError(...)
.finallyDo(...);
Observable.zip(
getObservable1(),
getObservable2(),
getObservable3(),
UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
Вопрос
Я новичок в RxJava, поэтому почти уверен, что мне не хватает чего-то фундаментального. Возникает вопрос: что за глупости я мог делать? Если это не очевидно из того, что я сказал до сих пор, что я могу сделать, чтобы помочь диагностировать проблему?
getObservable
асинхронным? Возможно, ваш тест завершится до того, какzip
сможет собрать все данные. Вы можете попробовать добавитьtoBlocking
после concatMap, чтобы увидеть, работает это или нет. - person akarnokd   schedule 14.12.2015