RxJs: доступ к данным перед flatMapLatest после завершения flatMapLatest

Сценарий:

  1. Пользователь использует фильтры, объединенные в единый поток
  2. Когда фильтры меняются, событие для бэкэнда запускается для получения «дешевых» данных.
  3. Когда поступают «дешевые» данные, другой запрос с теми же параметрами отправляется в другую конечную точку, которая возвращает «дорогие» данные, которые будут использоваться для обогащения дешевых данных. Запрос должен быть задержан на 1 секунду и запущен только в том случае, если пользователь не меняет ни один из фильтров (иначе он должен ждать 1 секунду).

И я борюсь с 3) вариантом без промежуточных переменных.

let filterStream = Rx.Observable
.combineLatest(
  filterX,
  filterY,
  (filterX, filterY) => {
    x: filterX,
    y: filterY
  }
 )
 .map((filters) => {
  limit: 100,
  s: filters.x.a,
  f: filters.x.b + filters.y.c,
})
.distinctUntilChanged()


let cheapDataStream = filterStream
.flatMapLatest((filterQuery) =>
Rx.Observable.fromPromise(cheapBackendApiCall(filterQuery)))

// render cheap results
cheapDataStream
.map(result => transformForDisplay(result))
.subscribe(result => { 
  //render
  // how do i invoke expensiveApiCall() with `filterQuery` data here?
  // with a delay, and only if filterQuery has not changed?

});

person gerasalus    schedule 22.03.2016    source источник
comment
Так дорогие данные = (дешевые данные + дополнительные) или дорогие данные = дополнительные и вам нужно их комбинировать самостоятельно?   -  person paulpdaniels    schedule 22.03.2016
comment
дорогие данные лишние, и мне нужно их объединить. Предположим, что дешевые данные — это список продуктов, а дорогие данные — это доступность этих продуктов — сначала вы получаете все продукты, а затем вы получаете доступность, которая возвращается только в том случае, если у продукта она есть.   -  person gerasalus    schedule 22.03.2016


Ответы (3)


Вы можете воспользоваться преимуществом неявного преобразования, чтобы избежать явного использования fromPromise везде. Затем вы можете использовать concat для немедленного возврата сначала дешевых данных, а затем дорогих + дешевых данных с задержкой. Вложив это в flatMapLatest, поток также отменит любой ожидающий expensiveCalls, если поступит новый запрос.

var filters = Rx.Observable
.combineLatest(
  filterX,
  filterY,
  (filterX, filterY) => {
    x: filterX,
    y: filterY
  }
 )
 .map((filters) => {
  limit: 100,
  s: filters.x.a,
  f: filters.x.b + filters.y.c,
})
.distinctUntilChanged()
.flatMapLatest(filters => {
  //This kicks off immediately
  var cheapPromise = cheapBackendApiCall(filters);

  //This was added in the latest version 4.1, the function is only called once it is subscribed to, 
  //if you are using earlier you will need to wrap it in a defer instead.
  var expensivePromiseFn = () => expensiveBackendApiCall(filters);

  //For join implicitly calls `fromPromise` so you can pass the same 
  // sort of arguments.
  var cheapAndExpensive = Rx.Observable.forkJoin(
                            cheapPromise, 
                            expensivePromiseFn, 
                            (cheap, expensive) => ({cheap, expensive}));

  //First return the cheap, then wait 1500 millis before subscribing 
  //which will trigger the expensive operation and join it with the result of the cheap one
  //The parent `flatMapLatest` guarantees that this cancels if a new event comes in
  return Rx.Observable.concat(cheap, cheapAndExpensive.delaySubscription(1500));
})
.subscribe(x => /*Render results*/);
person paulpdaniels    schedule 22.03.2016
comment
Ницца. Единственным недостатком является то, что forkJoin включен только в rx.all.js, что добавляет +10 КБ для библиотеки по сравнению с rx.lite.js. - person gerasalus; 23.03.2016
comment
Я не следил за модульной работой, но я полагаю, что недавно было дополнение, позволяющее делать пользовательские сборки только с теми операторами, которые вам нужны. - person paulpdaniels; 23.03.2016

Вы ищете debounce ?
Кажется, этот оператор делает именно то, что вы описали.

person Curious Sam    schedule 22.03.2016
comment
Если я добавлю debounce() в CheapDataStream, он также задушит CheapDataStream, чего я не хочу. Если я создаю новый Rx.Observable в методе подписки, debounce помогает. Может быть, я мог бы использовать takeUntil в подписке с новым наблюдаемым, который не работает, если пока мы задерживаемся, пользователь меняет некоторые фильтры - person gerasalus; 22.03.2016
comment
@gerasalus, вы можете решить это по-разному. Например, вы создаете 2 подписки подешевле и по полной. Вы устанавливаете только базовую информацию в дешевой подписке без отмены отказов и еще одну подписку на полную информацию после устранения отказов. - person Curious Sam; 22.03.2016

Решение, которое я придумал, не уверен, что do() подходит, а дешевое + дорогое слияние данных выглядит не очень реактивным.

Rx.Observable
.combineLatest(
  filterX,
  filterY,
  (filterX, filterY) => {
    x: filterX,
    y: filterY
  }
 )
 .map((filters) => {
  limit: 100,
  s: filters.x.a,
  f: filters.x.b + filters.y.c,
})
.distinctUntilChanged()
.flatMapLatest((filterQuery) =>
  Rx
  .Observable
  .fromPromise(cheapBackendApiCall(filterQuery))
  .map((results) => {
    filterQuery: filterQuery,
    results: results
  })
)
.do((filtersAndResults) => {
  // render filtersAndResults.results
})
.debounce(1500)
.flatMapLatest((filtersAndResults) => {
  return Rx
  .Observable
  .fromPromise(expensiveBackendApiCall(filtersAndResults.filterQuery))
  .map(results => {
    expensiveData: results,
    cheapData: filtersAndResults.results
  })
})
.subscribe((result)=> {
  // combine results.cheapData + results.expensiveData with simple .map and .find
  //  and render
})
person gerasalus    schedule 22.03.2016