Интервал RxJs с takeUntil для публикации последнего значения

У меня есть код, который опрашивает до тех пор, пока задача не будет завершена

Смотри ниже

this.simulationStatus =
  interval(2000).pipe(
    switchMap(
      () => from(this.simulationService.getSimulationStatus(this.route.snapshot.paramMap.get('jobId')))),
    takeUntil(this.stopPoll),
    tap(simulation => {
      if (simulation && simulation.complete) {
        if (this.stopCount == 1) {
          // Get once after complete
          this.stopPoll.next(true);
        }
        this.stopCount++;
      }
    })
  );

Я пробовал использовать takeUntil и takeWhile, проблема в том, что последнее значение никогда не публикуется после завершения задачи.

Чтобы обойти это, я должен включить метод tap в тему stopPoll и увеличить значение stopCount, чтобы получить последнее значение.

Итак, вышеизложенное работает, но кажется немного беспорядочным, я уверен, что должен быть лучший способ добиться этого?

Я бы ожидал, что takeUntil опубликует последнее значение или будет иметь переопределение, чтобы сообщить ему, например, takeUntil(observable, {publishLast: true})

Кстати, обновление, наблюдаемое подписывается шаблоном Angular 6. Спасибо заранее.


person Lenny D    schedule 09.08.2018    source источник


Ответы (3)


Одна вещь, которую вы можете сделать, это использовать собственный оператор типа takeWhile, например:

const completeWith = <T>(predicate: (arg: T) => boolean) => (
  source: Observable<T>,
) =>
  new Observable<T>(observer =>
    source.subscribe(
      value => {
        observer.next(value);
        if (predicate(value)) {
          observer.complete();
        }
      },
      error => observer.error(error),
      () => observer.complete(),
    ),
  );

Не кажется хорошей идеей рассматривать его как разновидность takeWhite, потому что он не просто принимает значения, пока выполняется условие, но также выдает дополнительное значение.

Возможно, более элегантным решением было бы сделать наблюдаемый статус симуляции генерирующим два типа значений: следующие уведомления и уведомления о завершении, аналогично тому, как работают операторы материализации/дематериализации.

person Ivan    schedule 10.08.2018
comment
Выглядит хорошо, я, вероятно, буду использовать это. Я согласен с вашими комментариями относительно названия takeWhile. Однако я использую takeUntil, и я думаю, что, учитывая его имя, имеет смысл, что у него есть опция перегрузки для выдачи последнего значения. Глядя на StackOverflow, другие наверняка сталкивались с этой ситуацией и нуждались в встроенной функциональности. - person Lenny D; 10.08.2018
comment
Самое элегантное решение, которое я нашел. Спасибо! - person lord5et; 09.04.2021

Вы также можете создать тему и испустить, используя next(), если хотите завершить наблюдаемое.

this.stopPoll: Subject<any> = new Subject<any>();

Если вы хотите сделать, завершите подписку. вы можете вызвать this.stopPoll.next(true);

вы можете получить доступ к данным в subscribe()

this.simulationStatus.subscribe(success=>{}, failure=>{}, complete=>{});
person Suresh Kumar Ariya    schedule 09.08.2018
comment
Да именно этим я и занимаюсь. и вызов this.stopPoll.next(true) в методе tap, но у меня есть 7 даже 9 дополнительных строк кода, включая объявления stopCount и stopPoll только потому, что последнее значение не публикуется по умолчанию. - person Lenny D; 09.08.2018
comment
Вы можете получить последнее значение в методе complete. т.е. третьи варианты подписки() - person Suresh Kumar Ariya; 09.08.2018
comment
Кроме того, это угловой шаблон, который подписывается на наблюдаемый, вероятно, следует упомянуть и об этом, но да, я могу подписаться вручную, это может быть чище. - person Lenny D; 09.08.2018
comment
Подписка на complete => {} будет работать, но я теряю асинхронную привязку к своему шаблону, поэтому мне нужно снова отправить ее на другую тему, к которой привязан шаблон. Так что все еще немного грязно :/ ! Либо так, либо потеряйте OnPush changeDetectionStrategy, который становится неэффективным. - person Lenny D; 09.08.2018
comment
Нашел это с помощью takeWhile stackoverflow.com /questions/44641246/ все решения взломаны. Я бы предположил, что в rxjs отсутствует функциональность. - person Lenny D; 09.08.2018
comment
Да, поднимет вопрос Бен Леш RxJS Github. - person Suresh Kumar Ariya; 09.08.2018
comment
Только что создал эту проблему github.com/ReactiveX/rxjs/issues/4000, точка его при этом. Но спасибо за ваше время и усилия! - person Lenny D; 09.08.2018

Тем временем это было реализовано в rxjs как takeWhile(condition, ?inclusive):

timer(0, 10).pipe(
    takeWhile((x) => x < 3, true)
)

испускает 0, 1, 2, 3

person vbraun    schedule 23.06.2021