Почему мой RxJava Observable не излучает или не завершает работу, если он не блокируется?

Задний план

У меня есть несколько RxJava Observables (либо сгенерированных из клиентов Джерси, либо заглушек с использованием Observable.just(someObject)). Все они должны выдавать ровно одно значение. У меня есть компонентный тест, который имитирует всех клиентов Джерси и использует Observable.just(someObject), и я вижу там то же поведение, что и при запуске производственного кода.

У меня есть несколько классов, которые действуют на эти наблюдаемые, выполняют некоторые вычисления (и некоторые побочные эффекты - я мог бы сделать их прямыми возвращаемыми значениями позже) и возвращать пустые наблюдаемые пустоты.

В какой-то момент в одном таком классе я пытаюсь заархивировать несколько исходных наблюдаемых объектов, а затем сопоставить их - что-то вроде следующего:

public Observable<Void> doCalculation() {
    return Observable.zip(
        getObservable1(),
        getObservable2(),
        getObservable3(),
        UnifyingObject::new
    ).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
}

// in Unifying Object
public Observable<Void> processToNewObservable() {
    // ... do some calculation ...
    return Observable.empty();
}

Затем все вычисляющие классы объединяются и ожидаются:

// Wait for rule computations to complete
List<Observable<Void>> calculations = ...;
Observable.zip(calculations, results -> results)
        .toBlocking().lastOrDefault(null);

Проблема

Проблема в том, что processToNewObservable() никогда не выполняется. По процессу исключения я вижу, что это getObservable1(), в этом проблема - если я заменю его на Observable.just(null), все будет выполнено так, как я себе представлял (но с нулевым значением, где мне нужно настоящее).

Повторюсь, getObservable1() возвращает Observable от клиента Jersey в производственном коде, но этот клиент является имитацией Mockito, возвращающей Observable.just(someValue) в моем тесте.

Расследование

Если я конвертирую getObservable1() в блокировку, а затем оборачиваю первое значение в just(), снова все выполняется так, как я себе представлял (но я не хочу вводить этап блокировки):

Observable.zip(
    Observable.just(getObservable1().toBlocking().first()),
    getObservable2(),
    getObservable3(),
    UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())

Моя первая мысль заключалась в том, что, возможно, что-то еще потребляло значение, испускаемое из моего наблюдаемого, и zip видел, что оно уже было завершено, таким образом определяя, что результатом их сжатия должна быть пустая наблюдаемая. Я пробовал добавлять .cache() к каждому наблюдаемому источнику, который, как мне кажется, связан с этим, но это не повлияло на поведение.

Я также пробовал добавить обработчики next / error / complete / finally в getObservable1 (без преобразования его в блокировку) непосредственно перед zip, но ни один из них тоже не выполнил:

getObservable1()
    .doOnNext(...)
    .doOnCompleted(...)
    .doOnError(...)
    .finallyDo(...);

Observable.zip(
    getObservable1(),
    getObservable2(),
    getObservable3(),
    UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())

Вопрос

Я новичок в RxJava, поэтому почти уверен, что мне не хватает чего-то фундаментального. Возникает вопрос: что за глупости я мог делать? Если это не очевидно из того, что я сказал до сих пор, что я могу сделать, чтобы помочь диагностировать проблему?


person Rowan    schedule 13.12.2015    source источник
comment
Будет ли что-нибудь из getObservable асинхронным? Возможно, ваш тест завершится до того, как zip сможет собрать все данные. Вы можете попробовать добавить toBlocking после concatMap, чтобы увидеть, работает это или нет.   -  person akarnokd    schedule 14.12.2015
comment
В тесте нет ничего асинхронного - это просто Observable.just (). Тем не менее, что-то собирает заархивированный и сопоставленный наблюдаемый объект и в конечном итоге вызывает его блокировку. Чтобы быть уверенным, я попытался добавить toBlocking.first () после concatMap и обернуть его с помощью Observable.just () (чтобы получить Observable, а не BlockingObservable, чтобы удовлетворить интерфейс); попытка вызвать исключение NoSuchElementException: последовательность не содержит элементов   -  person Rowan    schedule 14.12.2015


Ответы (3)


Наблюдаемый должен излучать, чтобы начать цепочку. Вы должны думать о своем конвейере как о том, что произойдет, когда Observable будет излучать.

Вы не поделились тем, что на самом деле наблюдалось, но Observable.just () заставляет Observable немедленно испускать завернутый объект.

person John Scattergood    schedule 13.12.2015
comment
Я не понимаю, почему наблюдаемое не испускается. Я отредактировал вопрос, чтобы он был более ясным, но, повторяю, наблюдаемое либо из Джерси (в производственном коде), либо имитация с использованием Observable.just () (в тесте). Пара вопросов: а) если просто () отправит сразу, будут ли подписчики «пропустить» это излучение? б) если вы думаете, что проблема в том, что мой наблюдаемый объект не излучает, почему преобразование его в блокировку может вызывать его излучение? - person Rowan; 14.12.2015
comment
Думаю, сразу было слишком сильно. На самом деле ничего не испускается, пока не подписан Observable. Согласно контракту метода, just () вернет значение, добавляемое в Observable. Кстати, когда вы подписываетесь на цепочку Observable? @akarnokd может быть на чем-то. Я обновлю свой ответ завтра, когда у меня будет больше информации. - person John Scattergood; 14.12.2015
comment
Спасибо, вот что я понял: оперативность, подписка и просто (). Что касается того, когда я подписываюсь, см. Мой отредактированный вопрос для получения более подробной информации. Я думаю, что цепочка правильно подписывается и потребляется, поскольку все работает, если getObservable1 () блокируется или заменяется на Observable.just () - person Rowan; 14.12.2015

Основываясь на ответе в комментарии, либо один из getObservable не возвращает никакого значения, а просто завершается, либо насмешка Mockito делает что-то не так. Следующий автономный пример мне подходит. Не могли бы вы проверить его и начать медленно видоизменять, чтобы увидеть, где что-то ломается?

Observable.zip(
        Observable.just(1),
        Observable.just(2),
        Observable.just(3),
        (a, b, c) -> new Integer[] { a, b, c })
 .concatMap(a -> Observable.from(a))
 .subscribe(System.out::println)
 ;
person akarnokd    schedule 14.12.2015
comment
Согласно моему первоначальному вопросу, вы правы: один из getObservables не излучает - если только он не блокируется, и в этом случае это происходит. Однако я думаю, что понял причину - см. Мой ответ: stackoverflow.com/a/34275572/516439 - person Rowan; 14.12.2015

Примечание. Я не нашел здесь свой ответ очень удовлетворительным, поэтому я покопался немного дальше и нашел гораздо меньший вариант воспроизведения, поэтому я задал здесь новый вопрос: Почему мой RxJava Observable передает только первому потребителю?


Я выяснил, по крайней мере, часть своих проблем (и, извиняюсь перед всеми, кто пытался ответить, я не думаю, что у вас было много шансов, учитывая мое объяснение).

Все классы, выполняющие эти вычисления, возвращали Observable.empty() (согласно processToNewObservable() в моем исходном примере). Насколько я могу судить, Observable.zip() не подписывается на N-й наблюдаемый объект, который он архивирует, пока N-1-й наблюдаемый не выдаст значение.

В моем первоначальном примере утверждалось, что это было getObservable1(), которое плохо себя ведет - на самом деле это было немного неточно, позже это было обнаружено в списке параметров. Насколько я понимаю, причина, по которой он заблокировал, а затем снова превратил это значение в Observable, сработала, потому что его блокировка и вызов сначала принудительно его выполнение, и я получил побочные эффекты, которые хотел.

Если я изменю все свои вычисляющие классы, чтобы вместо этого возвращать Observable.just(null), все будет работать: последние zip() наблюдаемых всех классов вычислений работают через них всех, поэтому возникают все ожидаемые побочные эффекты.

Возврат нулевого значения Void кажется, что я определенно делаю что-то не так с точки зрения дизайна, но, по крайней мере, на этот конкретный вопрос есть ответ.

person Rowan    schedule 14.12.2015
comment
Я рад, что ты нашел ответ. - person John Scattergood; 14.12.2015