RxScala подписывается с несколькими наблюдателями, просто выдает событие первому

Я пытаюсь использовать несколько Observer подписаться на Observable, который onNext произошел в цикле. Кажется, это не работает для каждого наблюдателя.

import rx.lang.scala.Observable

object SubscribeMultiEvent extends App{
  val obv = Observable.apply[String]{ s =>
    def printForever: Unit = {
      s.onNext("hi~")
      Thread.sleep(1000)
      printForever
    }
    printForever
  }

  obv.subscribe(s => println(s"first observer - $s"))
  obv.subscribe(s => println(s"second observer - $s"))

  Thread.currentThread().join()
}

Ответ только для первого наблюдателя

first observer - hi~
first observer - hi~
...

Почему второй не может получить подписку? Спасибо


person LoranceChen    schedule 21.03.2016    source источник


Ответы (1)


Проблема в ваших кодах заключается в том, что ваш Observable синхронен. Это означает, что второй subscribe не запустится, пока не завершится первый subscribe. А так как ваш Observable никогда не будет завершен, второй subscribe не сможет запуститься.

Чтобы решить эту проблему, вам нужно сделать Observable асинхронным. Вы можете использовать subscribeOn для запуска в другом потоке. Например.,

import rx.lang.scala.Observable
import rx.lang.scala.schedulers.NewThreadScheduler

object SubscribeMultiEvent extends App{
  val obv = Observable.apply[String]{ s =>
    def printForever: Unit = {
      s.onNext("hi~")
      Thread.sleep(1000)
      printForever
    }
    printForever
  }.subscribeOn(NewThreadScheduler())

  obv.subscribe(s => println(s"first observer - $s"))
  obv.subscribe(s => println(s"second observer - $s"))

  Thread.sleep(60000)
}

Thread.sleep(60000) в конце важно. Потоки RxJava по умолчанию являются демонами, и если основной поток завершится, поскольку больше нет потоков, не являющихся демонами, JVM завершит работу. Чтобы основной поток не останавливался, вам нужно добавить что-то вроде Thread.sleep(60000).

person zsxwing    schedule 21.03.2016
comment
Спасибо, это здорово. Кроме того, могу ли я создать событие цикла, выдающее именно таким образом, но оно каждый раз итерирует подписку на тело для всех наблюдателей? Я пока не хочу использовать новый поток для каждой подписки. - person LoranceChen; 22.03.2016
comment
Похоже, вам нужно publish. См. этот пример: github. com/ReactiveX/RxScala/blob/ - person zsxwing; 22.03.2016