Как мне правильно расширить rxjs/Observable с помощью моего первого пользовательского оператора

Я пытаюсь использовать шаблон повтора в вызовах службы (на самом деле: @Effects в ngrx/store) с увеличенными интервалами задержки. Поскольку мне удалось придумать рабочий код для одного вызова (даже если он выглядит неоптимизированным, я не хочу сосредотачиваться на этом в своем вопросе), теперь я хотел бы извлечь его в пользовательский оператор Observable и использовать его повторно. во всех моих служебных вызовах.

Я ничего не знаю о том, как разработать API/использование для нового оператора и как сделать его распознаваемым TypeScript.

Приведенный ниже код точно не работает, потому что, вероятно, накапливает множество проблем.

Итак, теперь у меня есть вызов/эффект следующим образом:

  @Effect()
  loadData$: Observable<Action> = this.actions$
    .ofType(ActionTypes.LOAD_DATA)
    .pluck('payload')
    .switchMap(params => {
      return this.myService.getData(params)
        .map(res => new LoadDataCompleteAction(res))

        // ...and this part would have to be extracted: 
        .retryWhen(attempts => Observable
          .zip(attempts, Observable.range(1, 5))
          .flatMap((n, i) => {
            if (i < 4) {
              return Observable.timer(1000 * i);
            } else {
              throw(n);
            }
          })
        )
    })
    .catch(err => Observable.of(new LoadDataFailed()));

и то, что мне нужно, - это возможность повторно использовать часть повтора в других эффектах и ​​иметь шаблон, аналогичный приведенному ниже:

  @Effect()
  loadData$: Observable<Action> = this.actions$
    .ofType(ActionTypes.LOAD_DATA)
    .pluck('payload')
    .switchMap(params => {
      return this.myService.getData(params)
        .map(res => new LoadDataCompleteAction(res))
        .retryWhen(attempts => Observable.retryOrThrow(attempts, maxAttempts)

         // or maybe - that's my design question
         .retryOrThrow(attempts, maxAttempts)
    })
    .catch(err => Observable.of(new LoadDataFailed()));

Для простоты можно предположить, что шаблон обратного вызова задержки (i * 1000) будет постоянным для всего приложения.

Приведенный ниже код - моя попытка, но она явно не работает.

declare module 'rxjs/Observable' {
  interface Observable<T> {
    retryOrThrow<T>(attempts: any, max: number): Observable<T>;
  }     
}

Observable.prototype.retryOrThrow = function(attempt, max) {
  console.log('retryOrThrow called');

  return Observable.create(subscriber => {
    const source = this;
    const subscription = source.subscribe(() => {
        // important: catch errors from user-provided callbacks
        try {
          subscriber
            .zip(attempt, Observable.range(1, max + 1))
            .flatMap((n, i) => {
              console.log(n, i);
              if (i < max) {
                return Observable.timer(1000 * i);
              } else {
                throw(n);
              }
            });
        } catch (err) {
          subscriber.error(err);
        }
      },
      // be sure to handle errors and completions as appropriate and send them along
      err => subscriber.error(err),
      () => subscriber.complete());

    // to return now
    return subscription;
  });
};
  1. Я не уверен, как спроектировать API для нового оператора, какой синтаксис здесь подойдет лучше всего.
  2. Я не знаю, как правильно объявить новый оператор и пространство имен или модуль Observable, чтобы TypeScript распознавал новые вещи.

Обновленный сервисный вызов:

  getMocky(){
    const u = Math.random();

    const okUrl = 'http://www.mocky.io/v2/58ffadf71100009b17f60044';
    const erUrl = 'http://www.mocky.io/v2/58ffae7f110000ba17f60046';
    return u > 0.6 ? this.http.get(okUrl) : this.http.get(erUrl);
  }

person user776686    schedule 23.04.2017    source источник


Ответы (1)


Я не могу ответить на ваш вопрос напрямую, так как не знаю, как расширить rxjs с помощью пользовательского оператора.

К счастью, в вашем случае вам (и мне) это знать не нужно. На самом деле вы спрашиваете, как предварительно определить цепочку операторов для повторного использования в нескольких местах.

Все, что вам нужно, это заголовок let-Operator, и его довольно просто использовать.

Сначала извлеките логику, которую вы хотите повторно использовать, в функцию, которая возвращает наблюдаемое:

function retryOrThrow(maxAttempts, timeout) {
  return (source) =>
    source
      .retryWhen(attempts => Observable
        .zip(attempts, Observable.range(1, maxAttempts + 1))
        .flatMap((n, i) => {
          if (i < maxAttempts) {
            return Observable.timer(timeout * i);
          } else {
            throw (n);
          }
        })
      );
}

Используйте функцию в своем эффекте, используя let:

@Effect()
loadData$: Observable<Action> = this.actions$
  .ofType(ActionTypes.LOAD_DATA)
  .pluck('payload')
  .switchMap(params => {
    return this.myService.getData(params)
      .map(res => new LoadDataCompleteAction(res))
      // inject the pre-defined logic with the desired parameters
      .let(retryOrThrow(5, 1000))
    })
    .catch(err => Observable.of(new LoadDataFailed()));

Самый простой способ понять, что делает let, — посмотреть его исходный код. Это действительно просто, потому что все, что он делает, это применяет заданную функцию к наблюдаемому источнику.

person mtx    schedule 23.04.2017
comment
Это великолепно! Большое спасибо! Интересно, почему они не упоминают этот простой метод на этой странице github. .com/ReactiveX/rxjs/blob/master/doc/operator-creation.md - person user776686; 24.04.2017
comment
На самом деле, с этим кодом есть проблема — при подключении к основной цепочке Эффектов он, кажется, никогда не сбрасывает количество попыток. При подключении к служебному вызову он фактически никогда не повторяет вызов. Я постараюсь изолировать случай. Но по идее - я все равно поддерживаю ответ как правильный. - person user776686; 25.04.2017
comment
Не могли бы вы попробовать поймать сразу после let, а не за пределами switchMap? Я думаю, что ваш наблюдаемый эффект умирает после того, как он попадает в бросок внутри let. - person mtx; 25.04.2017
comment
@nuton Я только что попробовал это на более простом примере. Я думаю, проблема в том, что вы catch на самом наблюдаемом эффекте. С switchMap(...).catch(...) наблюдаемый эффект завершается после catch. Если вы catch внутри switchMap, наблюдаемая цепочка эффектов останется живой. Попробуйте связать это так: .switchMap(serviceCall.map(...).let(...).catch(...)). - person mtx; 25.04.2017
comment
Я обновил свой пост, указав сервисный вызов, с которым я тестирую цепочку. Я играл с ним в течение получаса или около того, и кажется, что, несмотря на использование случайного семени, URL, выбранный в начале цепочки, остается постоянным на протяжении всех попыток. У меня не было случая, чтобы серия закончилась на середине. Это либо серия всех неудач, либо один успешный вызов. - person user776686; 25.04.2017
comment
Вышеизложенное, конечно, не означает, что ваш код неверен, потому что вы обычно не переключаетесь между разными конечными точками. Я просто хотел бы понять, что происходит за кулисами, что выбранный URL-адрес на самом деле липкий. - person user776686; 25.04.2017