Как создать один Observable из другого

Допустим, у нас есть исходный Observable of Ints:

val source:Observable[Int]

Я хотел бы создать еще один Observable, производящий значения, разница которых с первым появившимся значением в source больше 10:

def detect() = Observable[Int](
  subscriber =>
    if (!subscriber.isUnsubscribed) {
      var start:Option[Int] = None
      source.subscribe(
        item => {
          if (start.isEmpty) {
            start = Option(item)
          }
          else {
            start.filter(v => Math.abs(item - v) > 10).foreach {
              item => subscriber.onNext(item)
            }
          }
        }
      )
      subscriber.onCompleted()
    }
)

Здесь я использовал var start для хранения первого значения source Observable.

Есть ли способ упростить этот код? Мне не нравится этот подход с присвоением значения переменной


person Nyavro    schedule 26.08.2016    source источник


Ответы (2)


Вот что я придумал:

import rx.lang.scala.Observable

val source = Observable.from(List(5, 2, 3, 16, -40, 2, -70, 50))

source.scan(Option.empty[(Int, Int)]) { (acc, next) =>
  acc.map(_.copy(_2 = next)) orElse Some((next, next))
}.collect {
  case Some((start, current)) if math.abs(start - current) > 10 => current
}.subscribe(x => println(x))

отпечатки

16
-40
-70
50

в основном сканирование сохраняет аккумулятор, который может быть неинициализирован (None) или может содержать пару: первое значение и последний элемент, испускаемый из источника. Затем мы собираем только те элементы, которые соответствуют вашему предикату.

person Łukasz    schedule 26.08.2016

Вам просто нужно применить оператор filter, который создает новый Observable, который отражает выбросы исходного наблюдаемого, но пропускает те, для которых предикат проверяет ложь:

val filtered = source.filter(v => Math.abs(item - v) > 10)
person Tassos Bassoukos    schedule 26.08.2016
comment
как мне сохранить первое значение источника? Я сравниваю каждое следующее значение источника с первым значением - person Nyavro; 26.08.2016