Исключение не распространяется на onError() при вызове из doOnSubscribe()

Существует PublishProcessor, который вызывает оператора .doOnSubscribe(checkCondition) для выполнения определенной проверки. checkCondition предназначен для выбрасывания UnsupportedOperationException, который затем должен быть передан подписчику по потоку до метода onError(). Вместо этого выбрасывается UndeliverableException и происходит сбой процесса.

publishProcessor
    .filter(() -> { // predicate })
    .observeOn(scheduler)
    .doOnSubscribe(checkCondition)
    .to((sourceFlowable) -> new FancyFlowable(sourceFlowable)))
    .safeSubscribe(subscriber);

Кто-нибудь знает, что здесь происходит не так? Почему исключение, выброшенное из checkCondition, не распространяется на onError подписчика?


person Araz Abishov    schedule 16.02.2017    source источник
comment
Наблюдаемое поведение связано с ошибкой. Исправление в #5103.   -  person akarnokd    schedule 17.02.2017


Ответы (2)


Это ошибка оператора doOnSubscribe в RxJava 2. Если обратный вызов onSubscribe дает сбой, Throwable в этот момент можно сигнализировать через onError на законных основаниях. Я опубликую исправление для этого в ближайшее время.

На данный момент этот комментарий может быть обходным путем для вас:

Observable.defer(() -> checkCondition ? error(...) : publishProcessor)

person akarnokd    schedule 16.02.2017

Возможно, потому что это не имеет смысла; doOnSubscribe запускается до того, как наблюдаемая цепочка будет правильно инициализирована. Вы рассматривали что-то подобное?

if(checkCondition) {
  Observable.error(...).safeSubscribe(subscriber);
}

А затем продолжите с исходным кодом (без doOnSubscribe).

person Tassos Bassoukos    schedule 16.02.2017
comment
Условие может измениться после подготовки Observable и до того, как подписчик подпишется, поэтому предварительная проверка условия не решит проблему. - person Araz Abishov; 16.02.2017
comment
Ооо, я знаю! Observable.defer(() -> checkCondition ? error(...) : publishProcessor), и цепочка в обычном режиме. - person Tassos Bassoukos; 17.02.2017