Выполнение произвольного количества последовательных зависимых запросов с помощью `cycle-http`

Есть ли примеры выполнения произвольного количества последовательных зависимых http-запросов с помощью cycle-http?

Я хочу получить каждую страницу из API, где следующий запрос может быть сделан только с использованием данных на текущей странице.

Я пытался адаптировать этот ответ, в котором используется Observable.merge(), но я Я не уверен, как подключить это к cycle-http источникам и приемникам.

Рекомендации


person Robert K. Bell    schedule 15.05.2016    source источник
comment
Я думаю, что сложность заключается в том, что если для выполнения каждого последующего запроса используется драйвер cycle.js, то как перенести данные из каждого запроса в следующий цикл?   -  person bloodyKnuckles    schedule 16.05.2016
comment
@cycle/fetch принимает произвольные ключи/значения, которые передаются через цикл драйвера. Можно получить при возврате свойства requests, но оно забивается после mergeAll.   -  person bloodyKnuckles    schedule 16.05.2016


Ответы (3)


Вот еще один пример произвольного количества последовательных зависимых запросов с использованием Cycle.js и драйвера @cycle/fetch.

(Используя GitHub API пользователей. Запрос пользователей возвращает 30 пользователей на страницу, а параметр since URL представляет собой идентификатор пользователя. число и начинает запрос с следующего идентификатора пользователя.)

Сначала основная часть функции main с комментариями:

const listResponse$ = sources.FETCH // response returned from FETCH driver
  .mergeAll()
  .flatMap(res => res.json())
  .scan(
    ((userstotals, users) =>
      [
        userstotals[0] + 1, // page count
        users[29] && users[29].id, // last id on full page
        userstotals[2].concat(users) // collect all users
      ]
    ),
    [0, undefined, []] // default accumulator
  )
  .share(); // allows stream split


// Branch #1 - cycle again for more pages
const listRequest$ = listResponse$
  .filter(users =>
    0 < users[1] && // full page id exists
    maxpages > users[0]; // less than maxpages
  )
  .startWith('initial')
  .map(users =>
    `https:\/\/api.github.com/users?since=${
      (!isNaN(parseInt(users[1], 10)) && users[1]) || // last id full page
      idstart // default id start
    }`
  );


// Branch #2 - display
const dom$ = listResponse$
  .map(userstotals => div(JSON.stringify(userstotals[2])));

(Это обновленный ответ. Я понял, что scan можно объединить в один.)

ОБЪЯСНЕНИЕ. Сначала извлеките ответ из свойства sources FETCH, сгладьте его и извлеките JSON, затем scan, чтобы подсчитать, сколько страниц было запрошено на данный момент. Количество запрошенных страниц позже сравнивается с maxpages, чтобы не превысить заданное число. Затем получите последнюю id полной страницы, если она существует, и последнюю, concat текущую страницу пользователей с коллекцией пользовательских страниц, накопленных на данный момент. После накопления информации об ответе share поток можно разделить на две ветви.

Первая ветвь используется для повторного прохождения запроса через драйвер FETCH для запроса дополнительных страниц. Но сначала filter проверить последнюю страницу и количество запрошенных страниц. Если идентификатор не является числом, то последняя страница была достигнута. Не продолжайте, если последняя страница уже достигнута и, следовательно, нет больше страниц для запроса. Также не продолжайте, если количество запрошенных страниц превышает значение maxpages.

Вторая ветвь просто обращается к накопленному ответу, чтобы получить полный список пользователей, затем JSON.stringifys объект и преобразует его в виртуальный объект dom (метод div) для отправки драйверу DOM для отображения.


А вот и полный скрипт:

import Cycle from '@cycle/rx-run';
import {div, makeDOMDriver} from '@cycle/dom';
import {makeFetchDriver} from '@cycle/fetch';

function main(sources) { // provides properties DOM and FETCH (evt. streams)

  const acctok = ''; // put your token here, if necessary
  const idstart = 19473200; // where do you want to start?
  const maxpages = 10;

  const listResponse$ = sources.FETCH
    .mergeAll()
    .flatMap(res => res.json())
    .scan(
      ((userstotals, users) =>
        [
          userstotals[0] + 1, // page count
          users[29] && users[29].id, // last id on full page
          userstotals[2].concat(users) // collect all users
        ]
      ),
      [0, undefined, []]
    )
    .share();

  const listRequest$ = listResponse$
    .filter(function (users) {
      return 0 < users[1] && maxpages > users[0];
    })
    .startWith('initial')
    .map(users =>
      `https:\/\/api.github.com/users?since=${
        (!isNaN(parseInt(users[1], 10)) && users[1]) || // last id full page
        idstart // default id start
      }` //&access_token=${acctok}`
    );

  const dom$ = listResponse$
    .map(userstotals => div(JSON.stringify(userstotals[2])));

  return {
    DOM: dom$,
    FETCH: listRequest$
  };
}

Cycle.run(main, {
  DOM: makeDOMDriver('#main-container'),
  FETCH: makeFetchDriver()
});

(Мой первый ответ, оставленный для потомков. Обратите внимание на два scan.)

const listResponse$ = sources.FETCH
  .mergeAll()
  .flatMap(res => res.json())
  .scan(((userscount, users) =>              // <-- scan #1
    [userscount[0] + 1, users]), [0, []]
  )
  .share();

const listRequest$ = listResponse$
  .filter(function (users) {
    return users[1][29] && users[1][29].id &&
      maxpages > users[0];
  })
  .startWith('initial')
  .map(users =>
    `https://api.github.com/users?since=${
      (users[1][users[1].length-1] && users[1][users[1].length-1].id) ||
        idstart
      }`
  );

const dom$ = listResponse$
  .scan(function (usersall, users) {          // <-- scan #2
    usersall.push(users);
    return usersall;
  }, [])
  .map(res => div(JSON.stringify(res)));

С помощью scaning один раз, мне нужно было получить полный последний идентификатор страницы, если он существует, и сохранить его в аккумуляторе.

person bloodyKnuckles    schedule 17.05.2016

будет лучше, если вы предоставите пример кода. однако основная логика может быть такой:

  1. Сопоставьте поток ответа с потоком запроса
  2. Начать поток запросов с начального запроса

Код будет таким:

function main (sources){
  const initialRequest = {
    url: 'http://www.google.com'
  };
  
  const request$ = sources.HTTP
  .filter(response$ => /*FILTER LOGIC GOES HERE */)
  .switch()//or you can use flatMap
  .map(response =>/* MAP RESPONSE TO A NEW REQUEST */)
  .startWith(initialRequest);
  
  return {
    HTTP: request$
  };
}

person erdal    schedule 16.05.2016

Так что это, вероятно, ужасно сложно, и я должен отказаться от него, чтобы правильно попробовать ответ Эрдала, но вот что я придумал с...

использование

export default function app({HTTP}) {
  const {
    allPagesRequest$: staffPagesReq$,
    latestData$: staff$,
  } = getAllPages({url: '/staff', HTTP});

  // staff$ is converted to vdom...

  return /* sinks */ {
    DOM:  staffPagesReq$,
    HTTP: staffVdom$,
  }
}

реализация

const fetchNthPage = (optsIn) => {
  const opts = merge(
    {
      url:  '',
      page: 0,
      HTTP: undefined,
    }, optsIn
  );

  const u = new URI(opts.url)
    .setQuery({'_page': opts.page.toString()});

  const pageNResponse$ = opts.HTTP
    .filter(
      res$ => res$.request.url === urlForEndpoint(u)
    )
    .flatMap(
      res$ => res$.catch(
        err => Observable.of(
          {
            body: {'error in response:': err.toString()}
          }
        )
      )
    )
    .map(res => res.body)
    .take(1)
    .shareReplay(1);

  return Observable.of({
    pageNRequest$:  Observable.of(basicRequestObject(u)),
    pageNResponse$: pageNResponse$,
    opts:           opts
  });
};


const encapsulateAs = typeName => data => {
  return {type: typeName, data}
};


const fetchAllPagesIndividually = (optsIn) => {
  const opts = merge(
    {
      url:  '',
      page: 0,
      HTTP: undefined,
    },
    optsIn
  );

  return Observable.defer(
    () => fetchNthPage(opts)
      .flatMap(x => {
        const mergedItems$ = Observable
          .merge(
            x.pageNRequest$.map(encapsulateAs('request')),
            x.pageNResponse$.map(encapsulateAs('response'))
          );


        const optsForNextPage = merge(opts, {
          page: opts.page + 1
        });

        const next$ = Observable
          .never() // `next$` shouldn't end when `pageNResponse$` does
          .merge(x.pageNResponse$)
          .shareReplay(1)
          .takeWhile(res => {
            //noinspection UnnecessaryLocalVariableJS
            let isFullPage = path(['response', 'length'], res) === apiPageSize;
            return isFullPage;
          })
          .flatMap(() => {
            return fetchAllPagesIndividually(optsForNextPage)
          });

        //noinspection UnnecessaryLocalVariableJS
        const concattedItem$ = Observable
          .concat(
            mergedItems$,
            next$
          );

        return concattedItem$
      })
      .shareReplay(1)
  );
};


const concatPages = (acc, currentVal, index, source) => acc.concat(currentVal);

const typeIs = typeStr => compose(
  equals(typeStr),
  prop('type')
);

const typeNotIn = typesArray => compose(
  not,
  unary(flip(contains)(typesArray)),
  prop('type')
);

const getAllPages = (optsIn) => {
  const f$ = fetchAllPagesIndividually(optsIn)
    .shareReplay(1);

  const allPagesRequest$ = f$
    .filter(typeIs('request'))
    .map(prop('data'));

  const allPagesResponse$ = f$
    .filter(typeIs('response'))
    .map(prop('data'));

  const theRest$ = f$
    .filter(typeNotIn(['request', 'response', 'data']));

  const latestData$ = allPagesResponse$
    .map(prop('response'))
    .scan(concatPages);

  return {
    allPagesRequest$,
    allPagesResponse$,
    latestData$,
    theRest$,
  }
};

compose(), not(), merge(), unary() и т. д. взяты из Ramda.

person Robert K. Bell    schedule 17.05.2016