Объедините несколько наблюдаемых с разными действиями / операциями

Я создаю приложение Angular2, поэтому привыкаю к ​​Observables и Reactive Extensions в целом. Я использую TypeScript и rxjs.

Теперь у меня есть наблюдаемая или, если хотите, поток массива некоторых объектов. Скажем, Человек-объекты. Теперь у меня есть два других потока объектов Person, и я хочу их объединить, чтобы получить поток, который всегда актуален:

var people$ = getPeople();                  // Observable<Person[]>
var personAdded$ = eventHub.personAdded;    // Observable<Person>;
var personRemoved$ = eventHub.personRemoved // Observable<Person>;

var allwaysUpToDatePeople$ = people$.doSomeMagic(personAdded$, personRemoved$, ...);

Если поток people-stream испускает массив, скажем, из 5 человек, а после этого поток personAdded-stream испускает человека, поток allPeople-stream будет генерировать массив из 6. Если поток personRemoved-stream испускает человека, allPeople- stream должен испускать массив объектов Person, а не тот, который только что испускал personRemoved-stream.

Есть ли способ, встроенный в rxjs, чтобы добиться такого поведения?


person YentheO    schedule 14.10.2016    source источник


Ответы (2)


Я предлагаю вам обернуть идею action в поток, который затем можно объединить и применить непосредственно к Array.

Первый шаг - определить некоторые функции, описывающие ваши действия:

function add(people, person) {
  return people.concat([people]);
}

function remove(people, person) {
  const index = people.indexOf(person);
  return index < 0 ? people : people.splice(index, 1);
}

Примечание: мы избегаем изменения массива на месте, потому что это может иметь непредвиденные побочные эффекты. Чистота требует, чтобы вместо этого мы создали копию массива.

Теперь мы можем использовать эти функции и поднять их в поток, чтобы создать Observable, который испускает функции:

const added$ = eventHub.personAdded.map(person => people => add(people, person));
const removed$ = eventHub.personRemoved.map(person => people => remove(people, person));

Теперь мы получаем события в форме: people => people, где входом и выходом будет массив людей (в этом примере упрощен до простого массива строк).

Как бы нам это подключить? На самом деле мы заботимся только о добавлении или удалении этих событий после, у нас есть массив для их применения:

const currentPeople = 

  // Resets this stream if a new set of people comes in
  people$.switchMap(peopleArray => 

    // Merge the actions together 
    Rx.Observable.merge(added$, removed$)

      // Pass in the starting Array and apply each action as it comes in
      .scan((current, op) => op(current), peopleArray)

      // Always emit the starting array first
      .startWith(people)
  )
  // This just makes sure that every new subscription doesn't restart the stream
  // and every subscriber always gets the latest value
  .shareReplay(1);

Есть несколько оптимизаций этого метода в зависимости от ваших потребностей (т.е. избегая каррирования функции или использования двоичного поиска), но я считаю, что это относительно элегантно для общего случая.

person paulpdaniels    schedule 14.10.2016
comment
Хорошее объяснение, я попробую, но я понимаю, о чем вы говорите! Я думаю, люди хотят этим заниматься чаще, не так ли? - person YentheO; 06.11.2016

Вы хотите объединить все потоки (стиль Ghostbusters), а затем использовать оператор сканирования для определения состояния. Оператор сканирования работает как сокращение Javascript.

Вот демо ...

const initialPeople = ['Person 1', 'Person 2', 'Person 3', 'Person 4'];

const initialPeople$ = Rx.Observable.from(initialPeople);

const addPeople = ['Person 5', 'Person 6', 'Person 7'];

const addPeople$ = Rx.Observable.from(addPeople)
            .concatMap(x => Rx.Observable.of(x).delay(1000)); // this just makes it async

const removePeople = ['Person 2x', 'Person 4x'];

const removePeople$ = Rx.Observable.from(removePeople)
                                              .delay(5000)
                                                .concatMap(x => Rx.Observable.of(x).delay(1000));

const mergedStream$ = Rx.Observable.merge(initialPeople$, addPeople$, removePeople$)

mergedStream$
  .scan((acc, stream) => {
        if (stream.includes('x') && acc.length > 0) {
            const index = acc.findIndex(person => person === stream.replace('x', ''))
            acc.splice(index, 1);
        } else {
            acc.push(stream);
        }
      return acc;
  }, [])
  .subscribe(x => console.log(x))

// In the end, ["Person 1", "Person 3", "Person 5", "Person 6", "Person 7"]

http://jsbin.com/rozetoy/edit?js,console

Вы не упомянули структуру своих данных. Мое использование «x» как флага немного (много) неуклюже и проблематично. Но я думаю, вы видите, как можно изменить оператор сканирования, чтобы он соответствовал вашим данным.

person D. Walsh    schedule 14.10.2016