Как мне перебирать функции, которые возвращают наблюдаемые rxjs

Я хочу перебрать серию асинхронных функций и завершить итерацию, когда возвращается false. Я новичок в rxjs и не могу заставить работать описанный ниже вариант использования. Я чувствую, что не понимаю чего-то фундаментального. Может ли кто-нибудь указать мне на это?

function validateA(): Observable<any> {
  // do stuff.
  return of({ id: "A", result: true }); // hardcoding result for now
}

function validateB(): Observable<any> {
  // do stuff
  return of({ id: "B", result: true }); // hardcoding result for now
}

function validateC(): Observable<any> {
  // do stuff
  return of({ id: "C", result: false });// hardcoding result for now
}

from([validateA, validateB, validateC])
  .pipe(
    map(data => data()),
    takeWhile(data => !!data.result)
  )
  .subscribe(data => console.log(`${data.id} passed!`));

https://stackblitz.com/edit/typescript-ub9c5r?file=index.ts&devtoolsheight=100


person user3508264    schedule 19.09.2020    source источник


Ответы (2)


Я бы сказал, что суть вашей логики верна. Чего не хватает, так это некоторой особенности rxJ.

Решения могут быть примерно такими. Объяснение нюансов в комментариях.

// start from an array of functions and turn it into a stream using RxJs from function
from([validateA, validateB, validateC])
  .pipe(
    // now execute each function sequentially, one after the other, via concatMap
    // operator. This operator calls each function and each function returns an Observable
    // concatMap ensures that the functions are called sequentially and also that the returned Observable (because each function returns an Observable)
    // is "flattened" in the result stream. In other words, you execute each function one at the time
    // and return the value emitted by the Observable returned by that function
    // until that Observable completes. Considering that you use the "of" function to
    // create the Observable which is returned by each function, such Observable emits just one value and then completes.
    concatMap(func => func()),
    // now you have a stream of values notified by the Observables returned by the functions
    // and you terminate as soon as a flase is received
    takeWhile(data => !!data.result)
  )
  .subscribe(data => console.log(`${data.id} passed!`));
person Picci    schedule 19.09.2020

Следующее, кажется, делает свое дело и лениво вызывает функции:

https://stackblitz.com/edit/typescript-9ystxv?file=index.ts

import {  from, Observable, of } from "rxjs";
import {  concatAll, find, map } from "rxjs/operators";

function validateA() {
  console.log('validateA');
  return of({ id: "A", result: false });
}

function validateB() {
  console.log('validateB');
  return of({ id: "B", result: true });
}

function validateC() {
  console.log('validateC');
  return of({ id: "C", result: false });
}

from([validateA, validateB, validateC])
  .pipe(
    map(validate => validate()),
    concatAll(),
    find(data => data.result)
  )
  .subscribe(data => console.log(`${data.id} passed!`));
person Ivan    schedule 19.09.2020