Кэшировать Http-запросы, используя только операторы RxJS

Я пытаюсь добиться того, что описано здесь: https://www.prestonlamb.com/blog/rxjs-cache-and-refresh-in-angular

Другими словами, я хочу кэшировать наблюдаемое в течение заданного времени (скажем, 1 минуту). Когда подписка осуществляется после указанного времени, данные должны снова извлекаться и снова кэшироваться на 1 минуту.

Пример ожидаемого результата:

T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:10: Request (5) => RETRIEVE data
T 01:15: Request (6) => data from cache
T 01:30: Request (7) => data from cache
T 02:30: Request (8) => RETRIEVE data

Оператор shareReplay отлично работает для кэширования данных за заданное время, но я не могу повторно запустить его, когда это заданное время истекло.

Пример использования оператора shareRelay(1, 1000):

T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:10: Request (5) => no response
T 01:15: Request (6) => no response
T 01:30: Request (7) => no response
T 02:30: Request (8) => no response

Ссылка выше пытается изменить это поведение, используя первый оператор, перехватывающий нулевые результаты. К сожалению, это не работает нормально, так как данные не кэшируются после первого раза.

Вот что я получил, используя статью по приведенной выше ссылке (следующее изображение описывает используемый код)

детали кода

Результат у меня есть:

T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:10: Request (5) => RETRIEVE data
T 01:15: Request (6) => RETRIEVE data
T 01:30: Request (7) => RETRIEVE data
T 02:30: Request (8) => RETRIEVE data

Я также видел пример с оператором таймера, но в этом случае данные извлекаются каждую минуту, даже если на них нет подписки. Я не хочу обновлять данные каждую минуту, я хочу, чтобы срок действия кэша истекал каждую минуту. К сожалению, я потерял код с оператором таймера, но результат был примерно таким:

Результат с оператором таймера:

T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:00: NO REQUEST => RETRIEVE data
T 01:10: Request (5) => data from cache
T 01:15: Request (6) => data from cache
T 01:30: Request (7) => data from cache
T 02:00: NO REQUEST => RETRIEVE data
T 02:30: Request (8) => data from cache

У кого-нибудь есть «чистое» решение RxJS, чтобы делать то, что я хочу?


person Sébastien BATEZAT    schedule 02.06.2020    source источник
comment
Вы сказали, что не хотите обновлять данные каждую минуту, но истечение срока действия кеша каждую минуту ... что это значит. Можете ли вы сделать это более ясным?   -  person YogendraR    schedule 03.06.2020
comment
Последний пример показывает именно то, что я имею в виду. Если через 1 минуту запроса нет, кеш очищается, но данные не извлекаются автоматически (т. е. нет запроса в 01:00). Идея состоит в том, чтобы работал первый сценарий, а не последний.   -  person Sébastien BATEZAT    schedule 03.06.2020


Ответы (3)


Я думаю, что решение, на которое вы предоставили ссылку, имеет небольшую ошибку, как я пытался выделить в этом StackBlitz. (или я неправильно понял мысль)

Вы можете попробовать это:

const refetchSbj = new Subject();
const refetchData$ = refetchSbj.pipe(
    switchMap(() => service.fetchData())
  ).pipe(share());

merge(
  src$,
  refetchData$
).pipe(
  shareReplay(1, 1000),
  buffer(concat(timer(0), refetchData$)),
  tap(values => !values.length && refetchSbj.next()),
  filter(values => values.length !== 0),
  // In case there is only one value,
  map(([v]) => v),
  // Might want to add this, because each subscriber will receive the value emitted by the `shareReplay`
  take(1)
)

shareReplay внутренне использует ReplaySubject, который отправляет все кэшированные значения синхронно новому подписчику. timer(0) похож на setTimeout(fn, 0), но важным аспектом здесь является то, что он асинхронен, что позволяет buffer собирать значения, выдаваемые ReplaySubject.

buffer(concat(timer(0), refetchData$)), — мы хотим убедиться, что внутренний наблюдаемый объект, предоставленный buffer, не завершится, иначе весь поток будет завершен. refetchData$ выдаст только что извлеченные данные, когда это произойдет (мы увидим, когда это произойдет чуть позже).

tap(values => !values.length && refetchSbj.next()) - если значения не были выданы, это означает, что ReplaySubject в использовании не имеет никаких значений, а это означает, что время прошло. Если это так, с помощью refetchSbj мы можем повторно заполнить кеш.


Вот как мы можем визуализировать поток:

T 00:00: Request (1) => RETRIEVE data
1) `refetchSbj.next()`
2) shareReplay will send the value resulted from `service.fetchData()` to the subscriber
3) the newly fetched value will be added to the `buffer`, and then the `refetchData$` from `concat(timer(0), refetchData$)` will emit(this is why we've used `share()`), meaning that `values` will not be an empty array
4) take(1) is reached, the value will be sent to the subscriber and then it will complete, so the `ReplaySubject` from `shareReplay()` will have no subscribers.

T 00:10: Request (2) => data from cache
`values` will not be empty, so `refetchSbj` won't emit and `take(1)` will be reached

T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:10: Request (5) => RETRIEVE data
Same as `Request (1)`
T 01:15: Request (6) => data from cache
T 01:30: Request (7) => data from cache
T 02:30: Request (8) => RETRIEVE data
person Andrei Gătej    schedule 03.06.2020
comment
Очень сложно понять (по крайней мере, для меня), но кажется, что это более чистый способ по сравнению с другими ответами. Спасибо - person Sébastien BATEZAT; 10.06.2020
comment
Спасибо! Я был бы рад подробнее остановиться на некоторых аспектах, если вы хотите - person Andrei Gătej; 10.06.2020

Я бы рассмотрел следующую стратегию.

Прежде всего вы создаете функцию createCachedSource, которая возвращает Observable с его кешем, реализованным через shareReplay(1).

Затем я бы использовал эту функцию, чтобы установить переменную source$, которую клиенты должны использовать для подписки на получение данных.

Теперь хитрость заключается в том, чтобы сбросить source$ с нужным интервалом, снова используя createCachedSource.

Все эти понятия в коде выглядят так

// this function created an Observable cached via shareReplay(1)
const createCachedSource = () =>
  of(1).pipe(
    tap((d) => console.log(`Go out and fetch ${d}`)),
    shareReplay(1),
    tap((d) => console.log(`Return cached data ${d}`))
  );

// source$ is the Observable the clients must subscribe to
let source$ = createCachedSource();

// every 1 sec reset the cache by creating a new Observable and setting it as value of source$
interval(1000)
  .pipe(
    take(10),
    tap(() => (source$ = createCachedSource()))
  )
  .subscribe();

// here we simulate 30 subscriptions to source$, one every 300 mS
interval(300)
  .pipe(
    take(30),
    switchMap(() => source$)
  )
  .subscribe();
person Picci    schedule 03.06.2020
comment
Спасибо за ответ. Мне не нравится идея подписаться на другого наблюдателя. Вот почему я не приму этот ответ и сосредоточусь на других. В любом случае спасибо за попытку! - person Sébastien BATEZAT; 10.06.2020

Observable const shared$ = data$.pipe(shareReplay(1, 1000)) будет кэшировать значение на 1000 мс. По истечении этого времени будущим подписчикам будут отправляться только полные уведомления (если data$ завершено). Проверяя, завершено ли shared$, вы знаете, что срок действия кеша истек, и вам нужно создать новый shared$ Observable для использования текущими и будущими подписчиками.

Вы можете использовать Observable более высокого порядка, чтобы предоставить эти shared$ Observables своим подписчикам.

const createShared = () => data$.pipe(shareReplay(1, 1000))
const sharedSupplier = new BehaviorSubject(createShared())

const cache = sharedSupplier.pipe(
  concatMap(shared$ => shared$.pipe(
    tap({ complete: () => sharedSupplier.next(createShared()) }),
  )),
  take(1) // emit once and complete so subscribers don't receive values from newly created shared Observables
)

https://stackblitz.com/edit/ggasua-2ntq7n


Используя операторы из изображения, которое вы разместили, вы также можете сделать:

const cache = sharedSupplier.pipe(
  concatMap(shared$ => shared$.pipe(
    first(null, defer(() => (sharedSupplier.next(createShared()), EMPTY))),
    mergeMap(d => isObservable(d) ? d : of(d))
  )),
  take(1)
)

Но это больше кода, а результат тот же.

person frido    schedule 10.06.2020