Обеспечьте порядок обновления подписчиков

Есть ли способ убедиться, что порядок обновления подписчиков обеспечен?

У меня есть горячая наблюдаемая, и мой первый подписчик выполняет некоторую работу по синхронизации, чтобы обновить переменную, а мой следующий подписчик затем должен инициализировать службу (только один раз!), И только после того, как эта переменная будет установлена!

это выглядит так:

import App from './App'

var appSource = App.init() // gets the hot observable

// our second subscriber
appSource.take(1).subscribe(() => {
  // take 1 to only run this once
  nextService.init()
})

где App.init выглядит так:

...
init() {
  var source = this.createObservable() // returns a hot interval observable that fetches a resource every few minutes

  // first subscriber, updates the `myVar` every few minutes
  source.subscribe((data) => this.myVar = data)

  return source
}
...

в настоящее время это работает, но я не уверен, всегда ли он будет следовать порядку на 100%.

ИЗМЕНИТЬ:

Как я слышал, подписчики будут вызываться FIFO. Так что порядок несколько обеспечен.


person Martin Broder    schedule 29.07.2015    source источник


Ответы (1)


Я не знаю, гарантирует ли RxJS когда-либо явно, что наблюдатели вызываются в порядке подписки. Но, как вы говорите, обычно это работает.

Однако вы можете подумать о моделировании вашего фактического рабочего процесса вместо того, чтобы полагаться на неявный порядок наблюдателя.

Похоже, вам нужно знать, когда ваше приложение инициализировано, чтобы вы могли предпринять дальнейшие действия. Вместо того, чтобы полагаться на знание внутренней работы App.init, App может предоставить для этого API:

Один (не Rx-способ) состоит в том, чтобы позволить вызывающей стороне предоставить обратный вызов init:

//...
init(callback) {
  var source = this.createObservable() // returns a hot interval observable that fetches a resource every few minutes

  // first subscriber, updates the `myVar` every few minutes
  source.subscribe((data) => {
    this.myVar = data;
    if (callback) {
        callback();
        callback = undefined;
    }
  })

  return source
}

// elsewhere
App.init(() => nextService.init());

Другой вариант вместо обратного вызова — просто заставить init возвращать Promise, который вы разрешаете (или Rx.AsyncSubject, который вы сигнализируете) после завершения инициализации.

И еще один вариант, но требующий некоторого рефакторинга, заключается в том, чтобы смоделировать this.myVar как наблюдаемые данные, которыми они являются. то есть:

init() {
    this.myVar = this.createObservable().replay(1);
    this.myVar.connect();
    // returns an observable that signals when we are initialized
    return this.myVar.first();
}

// elsewhere, you end up with this pattern...
const servicesToInit = [ App, service1, service2, service3 ];
Observable
    .of(servicesToInit)
    .concatMap(s => Rx.Observable.defer(() => s.init()))
    .toArray()
    .subscribe(results => {
        // all initializations complete
        // results is an array containing the value returned by each service's init observable
    });

Теперь все, что хочет использовать myVar, всегда должно каким-то образом подписаться на него, чтобы получить текущие и/или будущие значения. Они никогда не могли просто синхронно запросить текущее значение.

person Brandon    schedule 29.07.2015
comment
Эй, @Брэндон, спасибо за отличное объяснение! Вопрос о myVar: в настоящее время myVar представляет данные, поступающие из ресурса, который я должен получать каждые несколько минут. Чтобы получить эти данные, я просто добавил App.getData(), который возвращает this.myVar всякий раз, когда мне это нужно. Мне пришлось бы реорганизовать мой getMethod на что-то вроде getData => this.myVar.takeLast(1), чтобы получить последние данные, верно? - person Martin Broder; 29.07.2015
comment
Еще хуже: вам придется сделать getData async: getData => this.myVar.first() и назвать это так: getData().subscribe(data => value available here);. Async имеет тенденцию распространяться, как только вы начинаете его использовать. Вот почему я сказал, что это своего рода рефакторинг, если вы в настоящее время представляете его как синхронно доступный. - person Brandon; 29.07.2015
comment
Один из вариантов сделать это: this.myVarAsync = this.createObservable().do(val => this.myVar = val).replay(1);. Это сделало бы самое последнее значение доступным в виде простой переменной, а асинхронно изменяющееся значение стало бы доступным в качестве наблюдаемого для всех, кому оно нужно. - person Brandon; 29.07.2015
comment
Понятно, думаю, я выберу обратный вызов, так как здесь нет необходимости сходить с ума. Большое спасибо! - person Martin Broder; 29.07.2015