RxJS/ReactiveX Правильная связь модулей

Я новичок в реактивном программировании, но уже влюблен. Однако все еще трудно переключить мой мозг на это. Я стараюсь следовать всем рекомендациям, таким как «Избегайте использования тем» и «Избегайте нечистых функций» и, конечно же, «Избегайте императивного кода».

Чего мне трудно достичь, так это простой связи между модулями, когда один модуль может регистрировать «действие»/наблюдаемое, а другой может подписываться и реагировать на него. Простая шина сообщений, вероятно, будет работать, но это заставит использовать субъекты и императивный стиль кода, которого я пытаюсь избежать.

Итак, вот простая отправная точка, с которой я играю:

    // some sandbox
class Api {
  constructor() {
    this.actions = {};
  }

  registerAction(actionName, action) {
    // I guess this part will have to be changed
    this.actions[actionName] = action.publishReplay(10).refCount();
    //this.actions[actionName].connect();
  }

  getAction(actionName) {
    return this.actions[actionName];
  }
}

const api = new Api();

// -------------------------------------------------------------------
// module 1
let myAction = Rx.Observable.create((obs) => {
  console.log("EXECUTING");
  obs.next("42 " + Date.now());
  obs.complete();
});

api.registerAction("myAction", myAction);

let myTrigger = Rx.Observable.interval(1000).take(2);

let executedAction = myTrigger
.flatMap(x => api.getAction("myAction"))
.subscribe(
  (x) => { console.log(`executed action: ${x}`); },
  (e) => {}, 
  () => { console.log("completed");});

// -------------------------------------------------------------------
// module 2
api.getAction("myAction")
  .subscribe(
  (x) => { console.log(`SECOND executed action: ${x}`); },
  (e) => {}, 
  () => { console.log("SECOND completed");});

Итак, в настоящее время, когда второй модуль подписывается, он «запускает» Observable «myAction». И в реальном сценарии это может быть вызов ajax. Есть ли способ заставить всех подписчиков задержать/ждать, пока «myAction» не будет правильно вызван из модуля 1? И снова - это легко сделать с помощью предметов, но я пытаюсь сделать это, следуя рекомендуемым практикам.


person Pavel Kolev    schedule 03.11.2016    source источник
comment
Что вы подразумеваете под myAction правильно вызывается из модуля1? Вы имеете в виду, пока он не завершится или что?   -  person martin    schedule 03.11.2016
comment
Да. полный будет работать   -  person Pavel Kolev    schedule 03.11.2016


Ответы (2)


Если я вас правильно понимаю, вы хотите убедиться, что, если вы вызываете api.getAction, вы хотите, чтобы следующие значения в этом наблюдаемом дождались завершения вызова getAction. Перед обработкой других значений.

Это то, чего вы можете легко добиться, используя concatMap. ConcatMap возьмет функцию, которая возвращает наблюдаемое (в вашем случае вызов getAction). ConcatMap будет ждать, чтобы начать обработку следующего значения, пока наблюдаемое, возвращаемое функцией, не завершится.

Поэтому, если вы измените свой код таким образом, он должен работать (если я правильно понял).

let executedAction = myTrigger
.concatMap(x => api.getAction("myAction"))
.subscribe(
  (x) => { console.log(`executed action: ${x}`); },
  (e) => {}, 
  () => { console.log("completed");});

Если myTrigger имеет новое значение, оно не будет обработано до тех пор, пока не завершится наблюдаемое, возвращенное из api.getAction.

person KwintenP    schedule 03.11.2016
comment
Спасибо. Это улучшение, которое я рассматривал. Однако моя главная проблема заключается в том, что подписка из модуля 2 вызывает выброс раньше, чем подписка из модуля 1. Итак, я думаю, мне нужна какая-то задержка до испускания, а затем подписка? Но насколько я вхожу в документацию, задержка может быть только по времени (используя rxjs5) - person Pavel Kolev; 03.11.2016
comment
Таким образом, вы хотите, чтобы вызов из модуля 1 завершился до обработки вызова из модуля 2. Но вопрос в том, если это вызов AJAX, хотите ли вы, чтобы вызов выполнялся дважды, или вы хотите, чтобы вызов из второго модуля инициировал второй запрос? - person KwintenP; 03.11.2016
comment
Я просто хочу, чтобы подписка из модуля 2 слушал только без запуска запроса - person Pavel Kolev; 04.11.2016
comment
По мере того, как я копаю глубже, я думаю, что лучший способ добиться этого — с помощью планировщика? пользовательский. Но не могу понять, можете ли вы создавать собственные планировщики. - person Pavel Kolev; 04.11.2016

Итак, вот гораздо более простое решение, чем то, которое я думал. Просто используя 2 наблюдаемых. Аналогичного эффекта можно добиться с помощью планировщиков и subscribeOn.

    // some sandbox
class Action {
  constructor(name, observable) {
    this.name = name;
    this.observable = observable;
    this.replay = new Rx.ReplaySubject(10);
  }
}

function actionFactory(action, param) {

  return Rx.Observable.create(obs => {
    action.observable
    .subscribe(x => {
        obs.next(x);
        action.replay.next(x);
    }, (e) => {}, () => obs.complete);
  }); 
}

class Api {
  constructor() {
    this.actions = {};
  }

  registerAction(actionName, action) {
    let generatedAction = new Action(actionName, action);

    this.actions[actionName] = generatedAction;

    return actionFactory.bind(null, generatedAction);
  }

  getAction(actionName) {
    return this.actions[actionName].replay;
  }
}

const api = new Api();

// -------------------------------------------------------------------
// module 1
let myAction = Rx.Observable.create((obs) => {
  obs.next("42 " + Date.now());
  obs.complete();
});

let myRegisteredAction$ = api.registerAction("myAction", myAction);

let myTrigger = Rx.Observable.interval(1000).take(1).delay(1000);

let executedAction = myTrigger
.map(x => { return { someValue: x} })
.concatMap(x => myRegisteredAction$(x))
.subscribe(
  (x) => { console.log(`MAIN: ${x}`); },
  (e) => { console.log("error", e)}, 
  () => { console.log("MAIN: completed");});


// -------------------------------------------------------------------
// module 2
 var sub = api.getAction("myAction")
  .subscribe(
  (x) => { console.log(`SECOND: ${x}`); },
  (e) => {console.log("error : " + e)}, 
  () => { console.log("SECOND: completed");});
person Pavel Kolev    schedule 10.11.2016