Как правильно спроектировать подчиненных подписчиков, которые вызывают перезагрузку восходящего потока в RxJS?

Попытка построить расписание с использованием RxJS v5, где определенные события могут вызвать перезагрузку расписания. В настоящее время используются 3 источника — schedule$, event$ и userNotification$ (пример ниже).

Я пробовал несколько разных стратегий, и я постоянно получаю странности, такие как рекурсивные перезагрузки, когда наступает время события reloadSchedule. Есть ли способ, чтобы нисходящие данные (event$) корректно инициировали перезагрузку восходящего потока (schedule$) без каких-либо действий/уведомлений, оставшихся от предыдущих элементов расписания?

schedule$ = new Rx.BehaviorSubject(
  {schedule:[
    {start:'1pm', end:'2pm', action:'sayhi'},
    {start:'2pm', end:'3pm', action:'sayhi'},
    {start:'3pm', end:'3pm', action:'reloadSchedule'},
    {start:'3:01pm', end:'4pm', action:'sayhi'},
  ]}
);

function loadSchedule(){
  somethingAsync.then((moreData)=>schedule$.next(moreData));
}

event$ = schedule$.flatMap((data)=>{
  return Rx.Observable
    .from(data.schedule)
    .flatMap((event)=>{
      return Rx.Observable.timer(event.start)
      .flatMap(()=>{
        // do actions here once previous actions/notifications finish
        if(event.action === 'reloadSchedule'){
          loadSchedule()
        }
        return Rx.Observable.of(someUserMessage);
      })
    })
})

userNotification$ = Rx.Observable.timer(1000).withLatestFrom(event$)
.flatMap((someUserMessage)={
  // fade message after 5 seconds
});

userNotification.subscribe(()=>{});

person Adam    schedule 15.03.2016    source источник
comment
Что такое Schedule$ и чем оно отличается от ScheduleData$?   -  person paulpdaniels    schedule 15.03.2016
comment
Спасибо, что спросили Пола. Я исправил имена переменных. Также придумал решение, которое работает (не обязательно самое чистое, но работает). Выкладываю в пример другим.   -  person Adam    schedule 26.03.2016


Ответы (1)


В итоге нашел решение. Возможно, есть более чистые способы сделать это, но это сработало.

Основная идея состоит в том, чтобы иметь один таймер, который контролирует действия. Сравните время события с этим таймером, чтобы получить правильное текущее событие. Отпишитесь, когда придет время перезагрузить.

Грубый пример.

// start and end are ISO strings - showing 1pm etc.
let schedule$ = new Rx.BehaviorSubject([
  {start:'1pm', end:'2pm', action:'sayhi'},
  {start:'2pm', end:'3pm', action:'sayhi'},
  {start:'3pm', end:'3pm', action:'reloadSchedule'},
  {start:'3:01pm', end:'4pm', action:'sayhi'},
]);

schedule$.subscribe((sched)=>{
  new Scheduler(sched)
});

function loadSchedule(){
  somethingAsync.then((moreData)=>schedule$.next(moreData));
}

class Scheduler{
  constructor(schedule){
    let notificationsCleared = true;
    let sliced;
    let event$ = Rx.Observable
      .timer(1000)
      .filter(()=>notificationsCleared)
      .map(()=>{
        let now = (new Date()).toISOString();
        sliced || (sliced = schedule.slice(0));
        while (now > sliced[0].end){
          sliced.shift();
        }
        return sliced[0];
      }).share();

    let cleanup$ = event$.filter((evt)=>evt.action === 'reloadSchedule')

    let userNotification$ = event$.map(()=>{
      notificationsCleared = false;
      someAsyncNotification()
      .then(()=>notificationsCleared = true)
    });

    let userSub = userNotification.subscribe(()=>{});
    let cleanupSub = cleanup$.subscribe(()=>{
      loadSchedule();
      userSub.unsubscribe();
      cleanupSub.unsubscribe();
    });
  }
};
person Adam    schedule 26.03.2016