RxJS 5.0 делать пока как механизм

Я пытаюсь использовать RxJS для простого короткого опроса. Он должен делать запрос каждые delay секунд к местоположению path на сервере, заканчивая выполнением одного из двух условий: либо обратный вызов isComplete(data) возвращает true, либо он пытался использовать сервер более maxTries. Вот основной код:

newShortPoll(path, maxTries, delay, isComplete) {
    return Observable.interval(delay)
    .take(maxTries)
    .flatMap((tryNumber) => http.get(path))
    .doWhile((data) => !isComplete(data));
  }

Однако doWhile не существует в RxJS 5.0, поэтому условие, при котором он может только попробовать сервер maxTries, работает благодаря вызову take(), но условие isComplete не работает. Как я могу сделать так, чтобы наблюдаемые значения next() были до тех пор, пока isComplete не вернет true, после чего оно будет next() это значение и complete().

Должен отметить, что takeWhile() у меня здесь не работает. Он не возвращает последнее значение, которое на самом деле является самым важным, поскольку именно тогда мы знаем, что это сделано.

Спасибо!


person Colton Voege    schedule 02.03.2016    source источник
comment
Возможно дубликат: stackoverflow.com/questions/36007911/   -  person Brian Vanderbusch    schedule 26.04.2016
comment
Это не дубликат в том смысле, что вопрос требует замены doWhile.   -  person Bjorn    schedule 10.11.2016


Ответы (3)


Мы можем создать служебную функцию для создания второго Observable, который испускает каждый элемент, испускаемый внутренним Observable; однако мы вызовем функцию onCompleted, как только наше условие будет выполнено:

function takeUntilInclusive(inner$, predicate) {
    return Rx.Observable.create(observer => {
        var subscription = inner$.subscribe(item => {
            observer.onNext(item);

            if (predicate(item)) {
                observer.onCompleted();
            }
        }, observer.onError, observer.onCompleted);


        return () => {
            subscription.dispose();
        }
    });
}

А вот небольшой фрагмент с использованием нашего нового служебного метода:

const inner$ = Rx.Observable.range(0, 4);
const data$ = takeUntilInclusive(inner$, (x) => x > 2);
data$.subscribe(x => console.log(x));

// >> 0
// >> 1
// >> 2
// >> 3

Этот ответ основан на: RX Observable.TakeWhile проверяет условие ДО каждого элемента, но мне нужно выполнить проверку после

person Calvin Belden    schedule 04.03.2016
comment
Как ни странно, это решение работает примерно в половине случаев. Иногда последний пакет данных доходит до подписчиков, иногда просто нет. Есть идеи, почему? Переключение на switchMap тоже не помогло. - person Colton Voege; 05.03.2016
comment
Это не работает должным образом, потому что всегда вызывается onNext. Код внутри наблюдаемого должен выглядеть так: if (predicate(item)) {Observer.cemplete(); } иначе { наблюдатель.следующий(элемент); } - person Mário Kapusta; 18.06.2018

Этого можно добиться, используя retry и первые операторы.

// helper observable that can return incomplete/complete data or fail.
var server = Rx.Observable.create(function (observer) {
  var x = Math.random();

  if(x < 0.1) {
    observer.next(true);
  } else if (x < 0.5) {
    observer.error("error");
  } else {
    observer.next(false);
  }
  observer.complete();

  return function () {
  };
});
   
function isComplete(data) {
  return data;
}
  
var delay = 1000;
Rx.Observable.interval(delay)
  .switchMap(() => {
    return server
      .do((data) => {
        console.log('Server returned ' + data);
      }, () => {
        console.log('Server threw');
      })
      .retry(3);
  })
  .first((data) => isComplete(data))
  .subscribe(() => {
    console.log('Got completed value');
  }, () => {
    console.log('Got error');
  });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>

person Sergey Sokolov    schedule 24.01.2017

Это старый вопрос, но мне также пришлось опросить конечную точку, и я пришел к этому вопросу. Вот мой собственный оператор doWhile, который я создал:

import { pipe, from } from 'rxjs';
import { switchMap, takeWhile, filter, map } from 'rxjs/operators';

export function doWhile<T>(shouldContinue: (a: T) => boolean) {
  return pipe(
    switchMap((data: T) => from([
      { data, continue: true },
      { data, continue: shouldContinue(data), exclude: true }
    ])),
    takeWhile(message => message.continue),
    filter(message => !message.exclude),
    map(message => message.data)
  );
}

Это немного странно, но это работает для меня до сих пор. Вы могли бы использовать его с take, как вы пытались.

person Michael Pearson    schedule 20.12.2018