Обработка наблюдаемых исключений

Я изучаю RxScala и пришел к этому очень синтетическому фрагменту. Я пытаюсь обработать исключение в блоке onError:

def doLongOperation():String = {
  (1 to 10).foreach {
    _ =>
      Thread.sleep(100)
      print(".")
  }
  println()
  if (System.currentTimeMillis() % 2 == 0) {
    throw new RuntimeException("Something went wrong during long operation")
  }
  s"OK"
}

def main(args: Array[String]) {
  println("Changing status:  -> doing task1")
  Observable.just(
    doLongOperation()
  ).subscribe(
    str => println("Changing status: doing task1 -> doing task2"),
    throwable => println(s"Failed: ${throwable.getMessage}"),     //never get here
    () => println("Completed part")
  )
}

В случае исключения я ожидаю что-то вроде:

Failed: Something went wrong during long operation

Но что я получаю:

.........Exception in thread "main" java.lang.RuntimeException: Something went wrong during long operation
at stats.STest$.doLongOperation(STest.scala:20)
at stats.STest$.main(STest.scala:49)
at stats.STest.main(STest.scala)

Что мне не хватает? Должен ли я «вручную» вызывать onError у наблюдателя? Благодарим за любую помощь.


person Nyavro    schedule 05.11.2015    source источник


Ответы (2)


Проблема заключается в неправильном истолковании функции just(). Он принимает существующее значение во время сборки последовательности, а не метод, который выполняется при подписке подписчика. Другими словами, ваш код делает следующее:

var tempValue = doLongOperation();

Observable.just(tempValue).subscribe(...)

и бросает задолго до того, как Observable будет создан.

(Извините, я недостаточно знаю Scala или RxScala, поэтому извините за мои примеры на Java 8.)

Я не знаю, насколько RxScala отстает от RxJava, но в RxJava 1.0.15 есть новый фабричный метод fromCallable, который позволяет вам отложить одно значение:

Observable.fromCallable(() -> doLongOperation()).subscribe(...)

Альтернативой является обернуть исходный код в defer, чтобы при срабатывании doLongOperation он направлялся подписчику:

Observable.defer(() -> Observable.just(doLongOperation())).subscribe(...)
person akarnokd    schedule 05.11.2015

Observable.just плохо обрабатывает случай исключения, не совсем уверен, является ли это ошибкой или ожидаемым поведением. Вы можете попробовать это:

Observable.create[String]( o => {
  o.onNext(doLongOperation())
  o.onCompleted()
  Subscription{}
}).subscribe(
  str => println("Changing status: doing task1 -> doing task2"),
  throwable => println(s"Failed: ${throwable.getMessage}"),    here
  () => println("Completed part")
)
person Biju Kunjummen    schedule 05.11.2015