2

Are there any examples of making an arbitrary number of sequential, dependent http requests with cycle-http?

I want to fetch every page from an API where the next request can only be made using data in the current page.

I've tried to adapt this answer that uses Observable.merge(), but I'm not sure how to hook that up to cycle-http sources & sinks.

references

Community
  • 1
  • 1
Robert K. Bell
  • 9,350
  • 2
  • 35
  • 50
  • I think the difficulty is if a cycle.js driver is used to make each subsequent request, then how to carry the data from each request forward to the next cycle? – bloodyKnuckles May 16 '16 at 15:17
  • `@cycle/fetch` takes arbitrary key/values that get carried through the driver cycle. Can be retrieved upon return on the `requests` property, but that gets hammered after `mergeAll`. – bloodyKnuckles May 16 '16 at 17:54

3 Answers3

3

Here's another take on an arbitrary number of sequential, dependent requests using Cycle.js, and the @cycle/fetch driver.

(Using GitHub users API. The users query returns 30 users per page and the since URL parameter is a user id number and starts the query at the next user id.)

First the primary part of the main function with comments:

const listResponse$ = sources.FETCH // response returned from FETCH driver
  .mergeAll()
  .flatMap(res => res.json())
  .scan(
    ((userstotals, users) =>
      [
        userstotals[0] + 1, // page count
        users[29] && users[29].id, // last id on full page
        userstotals[2].concat(users) // collect all users
      ]
    ),
    [0, undefined, []] // default accumulator
  )
  .share(); // allows stream split


// Branch #1 - cycle again for more pages
const listRequest$ = listResponse$
  .filter(users =>
    0 < users[1] && // full page id exists
    maxpages > users[0]; // less than maxpages
  )
  .startWith('initial')
  .map(users =>
    `https:\/\/api.github.com/users?since=${
      (!isNaN(parseInt(users[1], 10)) && users[1]) || // last id full page
      idstart // default id start
    }`
  );


// Branch #2 - display
const dom$ = listResponse$
  .map(userstotals => div(JSON.stringify(userstotals[2])));

(This is an updated answer. I realized the scans can be combined into one.)

EXPLANATION: First pull the response from the sources property FETCH, flatten it and pull the JSON out, then scan to count how many pages have been queried so far. The number of pages queried is later compared to maxpages so as not to exceed the predetermined number. Next get the last id of a full page, if exists, and last, concat the current users page with the collection of users pages accumulated so far. After accumulating the response information share the stream so it can be split into two branches.

The first branch is used to re-cycle the query back through the FETCH driver to query more pages. But first filter to check for the last page and number of pages queried. If the id is not a number then the last page has been reached. Do not continue if the last page is already reached and therefore no more pages to query. Also do not continue if the number of pages queried exceeds the value of maxpages.

The second branch simply reaches into the accumulated response to get the full list of users, then JSON.stringifys the object and converts it to a virtual dom object (div method) to be sent to the DOM driver for displaying.


And here's the complete script:

import Cycle from '@cycle/rx-run';
import {div, makeDOMDriver} from '@cycle/dom';
import {makeFetchDriver} from '@cycle/fetch';

function main(sources) { // provides properties DOM and FETCH (evt. streams)

  const acctok = ''; // put your token here, if necessary
  const idstart = 19473200; // where do you want to start?
  const maxpages = 10;

  const listResponse$ = sources.FETCH
    .mergeAll()
    .flatMap(res => res.json())
    .scan(
      ((userstotals, users) =>
        [
          userstotals[0] + 1, // page count
          users[29] && users[29].id, // last id on full page
          userstotals[2].concat(users) // collect all users
        ]
      ),
      [0, undefined, []]
    )
    .share();

  const listRequest$ = listResponse$
    .filter(function (users) {
      return 0 < users[1] && maxpages > users[0];
    })
    .startWith('initial')
    .map(users =>
      `https:\/\/api.github.com/users?since=${
        (!isNaN(parseInt(users[1], 10)) && users[1]) || // last id full page
        idstart // default id start
      }` //&access_token=${acctok}`
    );

  const dom$ = listResponse$
    .map(userstotals => div(JSON.stringify(userstotals[2])));

  return {
    DOM: dom$,
    FETCH: listRequest$
  };
}

Cycle.run(main, {
  DOM: makeDOMDriver('#main-container'),
  FETCH: makeFetchDriver()
});

(My first answer, left for posterity. Notice the two scans.)

const listResponse$ = sources.FETCH
  .mergeAll()
  .flatMap(res => res.json())
  .scan(((userscount, users) =>              // <-- scan #1
    [userscount[0] + 1, users]), [0, []]
  )
  .share();

const listRequest$ = listResponse$
  .filter(function (users) {
    return users[1][29] && users[1][29].id &&
      maxpages > users[0];
  })
  .startWith('initial')
  .map(users =>
    `https://api.github.com/users?since=${
      (users[1][users[1].length-1] && users[1][users[1].length-1].id) ||
        idstart
      }`
  );

const dom$ = listResponse$
  .scan(function (usersall, users) {          // <-- scan #2
    usersall.push(users);
    return usersall;
  }, [])
  .map(res => div(JSON.stringify(res)));

By scaning once, up front, I then needed to grab the full page last id, if exists, and store that in the accumulator.

bloodyKnuckles
  • 11,551
  • 3
  • 29
  • 37
1

it'll be better if you provide some code example. however basic logic may be like this:

  1. Map the response stream to a request stream
  2. Start request stream with an initial request

Code would be like this:

function main (sources){
  const initialRequest = {
    url: 'http://www.google.com'
  };
  
  const request$ = sources.HTTP
  .filter(response$ => /*FILTER LOGIC GOES HERE */)
  .switch()//or you can use flatMap
  .map(response =>/* MAP RESPONSE TO A NEW REQUEST */)
  .startWith(initialRequest);
  
  return {
    HTTP: request$
  };
}
erdal
  • 421
  • 3
  • 10
0

So this is likely horribly overcomplicated and I should scrap it to properly try Erdal's answer, but here is what I've come up with...

usage

export default function app({HTTP}) {
  const {
    allPagesRequest$: staffPagesReq$,
    latestData$: staff$,
  } = getAllPages({url: '/staff', HTTP});

  // staff$ is converted to vdom...

  return /* sinks */ {
    DOM:  staffPagesReq$,
    HTTP: staffVdom$,
  }
}

implementation

const fetchNthPage = (optsIn) => {
  const opts = merge(
    {
      url:  '',
      page: 0,
      HTTP: undefined,
    }, optsIn
  );

  const u = new URI(opts.url)
    .setQuery({'_page': opts.page.toString()});

  const pageNResponse$ = opts.HTTP
    .filter(
      res$ => res$.request.url === urlForEndpoint(u)
    )
    .flatMap(
      res$ => res$.catch(
        err => Observable.of(
          {
            body: {'error in response:': err.toString()}
          }
        )
      )
    )
    .map(res => res.body)
    .take(1)
    .shareReplay(1);

  return Observable.of({
    pageNRequest$:  Observable.of(basicRequestObject(u)),
    pageNResponse$: pageNResponse$,
    opts:           opts
  });
};


const encapsulateAs = typeName => data => {
  return {type: typeName, data}
};


const fetchAllPagesIndividually = (optsIn) => {
  const opts = merge(
    {
      url:  '',
      page: 0,
      HTTP: undefined,
    },
    optsIn
  );

  return Observable.defer(
    () => fetchNthPage(opts)
      .flatMap(x => {
        const mergedItems$ = Observable
          .merge(
            x.pageNRequest$.map(encapsulateAs('request')),
            x.pageNResponse$.map(encapsulateAs('response'))
          );


        const optsForNextPage = merge(opts, {
          page: opts.page + 1
        });

        const next$ = Observable
          .never() // `next$` shouldn't end when `pageNResponse$` does
          .merge(x.pageNResponse$)
          .shareReplay(1)
          .takeWhile(res => {
            //noinspection UnnecessaryLocalVariableJS
            let isFullPage = path(['response', 'length'], res) === apiPageSize;
            return isFullPage;
          })
          .flatMap(() => {
            return fetchAllPagesIndividually(optsForNextPage)
          });

        //noinspection UnnecessaryLocalVariableJS
        const concattedItem$ = Observable
          .concat(
            mergedItems$,
            next$
          );

        return concattedItem$
      })
      .shareReplay(1)
  );
};


const concatPages = (acc, currentVal, index, source) => acc.concat(currentVal);

const typeIs = typeStr => compose(
  equals(typeStr),
  prop('type')
);

const typeNotIn = typesArray => compose(
  not,
  unary(flip(contains)(typesArray)),
  prop('type')
);

const getAllPages = (optsIn) => {
  const f$ = fetchAllPagesIndividually(optsIn)
    .shareReplay(1);

  const allPagesRequest$ = f$
    .filter(typeIs('request'))
    .map(prop('data'));

  const allPagesResponse$ = f$
    .filter(typeIs('response'))
    .map(prop('data'));

  const theRest$ = f$
    .filter(typeNotIn(['request', 'response', 'data']));

  const latestData$ = allPagesResponse$
    .map(prop('response'))
    .scan(concatPages);

  return {
    allPagesRequest$,
    allPagesResponse$,
    latestData$,
    theRest$,
  }
};

compose(), not(), merge(), unary(), etc are from Ramda.

Community
  • 1
  • 1
Robert K. Bell
  • 9,350
  • 2
  • 35
  • 50