24

First: This is the first project in which I am using RxJs, I thought I will learn best by using it.

I found this answer: Turning paginated requests into an Observable stream with RxJs But it says in the comments:

You're exceeding the maximum call stack still. At around 430 pages returned. I think recursion might not be the best solution here

I want to query the Youtube Data API, the results come back in pages and I need to paginate through them. I imagined a work flow like this could work: 1)Initiate a call 2)Check if the response has a 'nextPageToken' 3)If it has, do another request to the Youtube API 4)If not, finish

So to do this I could Imagine the following Observables / streams:
FirstRequestStream -A-X--------------->
ResponseStream     -A-A-A-A--X-------->
RequestStream      -I-A-I-A----------->
A = Action
I = Info from upper stream
X = Termination

(Not sure if this diagram is correct the way I made it)

So the ResponseStream depends on FirstRequestStream and RequestStream(using the merge function). The RequestStream depends on the ResponseStream( is this called a circulating observable ?)

-Is this the right approach ?

-Are 'circulating observables' a good thing, are they even possible ?(I had problems creating one).

-Any other way I should try first?

-Is it possible to create interdependent observable streams ?

Thank you for your help.

Community
  • 1
  • 1
jjuser19jj
  • 1,637
  • 3
  • 20
  • 38

4 Answers4

36

You are overcomplicating this problem, it can be solved a lot easier using defer operator.

Idea is that you are creating deferred observable (so it will be created and start fetching data only after subscription) and concatenate it with the same observable but for the next page, which will be also concatenated with the next page, and so on ... . And all of that can be done without recursion.

Here is how the code looks:

const { defer, from, concat, EMPTY, timer } = rxjs; // = require("rxjs")
const { mergeMap, take, mapTo, tap } = rxjs.operators; // = require("rxjs/operators")

// simulate network request
function fetchPage(page=0) {
  return timer(100).pipe(
    tap(() => console.log(`-> fetched page ${page}`)),
    mapTo({
      items: Array.from({ length: 10 }).map((_, i) => page * 10 + i),
      nextPage: page + 1,
    })
  );
}

const getItems = page => defer(() => fetchPage(page)).pipe(
  mergeMap(({ items, nextPage }) => {
    const items$ = from(items);
    const next$ = nextPage ? getItems(nextPage) : EMPTY;
    return concat(items$, next$);
  })
);

// process only first 30 items, without fetching all of the data
getItems()
 .pipe(take(30))
 .subscribe(e => console.log(e));
<script src="https://unpkg.com/rxjs@6.2.2/bundles/rxjs.umd.min.js"></script>
Oles Savluk
  • 4,315
  • 1
  • 26
  • 40
  • But as I understand it this will make all the paginated call seven if no-one is interested in the results--eg, the user has not pressed the "next page" button, done "inifinite scrolling". How can this logic be modified to allow something like that? –  Jun 27 '16 at 18:05
  • @torazaburo no this observable is deferred ("lazy"), so it will start making requests only after someone has subscribed to it, and stop when you unsubscribe. So if you "take" just several items from the stream, only needed pages will be fetched (not all of them), see the code examples in the answer. – Oles Savluk Jun 27 '16 at 18:29
  • I tried this solution in a [jsbin](http://jsbin.com/yeqimokode/2/edit?js,console,output) and it does fetch all the items at once . In my example I also zip the observable with a click event and that didn't help – hasanain Jun 12 '18 at 21:59
  • If anyone is trying this with RxJs6, you need to import `concat` like this, `import { concat } from 'rxjs'`, not like this `import { concat } from 'rxjs/operators'` – smbergin79 Aug 16 '18 at 02:17
  • 1
    "And all of that can be done without recursion" From what I know calling fetchItems inside fetchItems IS a recursion. Can you explain? – kayo Aug 24 '18 at 10:25
  • @kayo as you can see `fetchItems` return some value right away, without waiting for result of other calls – Oles Savluk Aug 24 '18 at 17:04
  • 1
    @hasanain I have updated code snippet to rxjs v6 (sorry for the late reply) – Oles Savluk Aug 24 '18 at 23:14
  • 1
    This may be also a case for `expand` - see https://blog.angularindepth.com/rxjs-understanding-expand-a5f8b41a3602 – Picci Sep 12 '18 at 06:01
21

Here is my solution using the rxjs operators expand, reduce and empty using the HttpClient module:

Suppose your API response is an object containing shaped like the following

interface Response {
  data: items[]; // array of result items
  next: string|null; // url of next page, or null if there are no more items
}

You could use expand and reduce like so

getAllResults(url) {
  return this.http.get(url).pipe(
    expand((res) => res.next ? this.http.get(res.next) : EMPTY),
    reduce((acc, res) => acc.concat(res.data), [])
  );
}
slinhart
  • 562
  • 5
  • 17
LuJaks
  • 1,047
  • 1
  • 10
  • 21
12

I shamelessly reuse the code snippet from Oles Savluk, with its good fetchPage function, and I apply the ideas explained in the blog article linked to by Picci (in the comments), using expand.

Article on expand by Nicholas Jamieson

It gives a slightly simpler code, with recursion hidden in the expand call (and comments of the article show how to linearize it, if needed).

const { timer, EMPTY } = rxjs; // = require("rxjs")
const { concatMap, expand, mapTo, tap, toArray } = rxjs.operators; // = require("rxjs/operators")

// simulate network request
const pageNumber = 3;
function fetchPage(page = 0) {
  return timer(1000).pipe(
    tap(() => console.log(`-> fetched page ${page}`)),
    mapTo({
      items: Array.from({ length: 10 }).map((_, i) => page * 10 + i),
      nextPage: ++page === pageNumber ? undefined : page,
    }),
  );
}

const list = fetchPage().pipe(
  expand(({ nextPage }) => nextPage ? fetchPage(nextPage) : EMPTY),
  concatMap(({ items }) => items),
  // Transforms the stream of numbers (Observable<number>)
  // to a stream with only an array of numbers (Observable<number[]>).
  // Remove if you want a stream of numbers, not waiting for all requests to complete.
  toArray(),
);

list.subscribe(console.log);
<script src="https://unpkg.com/rxjs@6.2.2/bundles/rxjs.umd.min.js"></script>
frido
  • 13,065
  • 5
  • 42
  • 56
PhiLho
  • 40,535
  • 6
  • 96
  • 134
  • Thanks for the link to Nicholas Jamieson's article - it had a solution for the exact problem I was trying to solve! – decocijo Jun 26 '19 at 06:54
0

LuJaks is definitively the simplest approach !

For a one line example, suppose you have a function that make a http request for a given page and that returns a (partial) array of data. We call that function until server returns empty array :

import { Observable, EMPTY, of } from "rxjs";
import { expand, reduce } from "rxjs/operators";

// Mock a request that returns only 5 pages... 
function httpGet(p): Observable<number[]> {
  if (p > 5) { return of([]); }
  return of(new Array(10).fill(0).map((_, i) => p * 10 + i));
}

httpGet(0).pipe( // get the fist page
    expand((value, index) => (value.length > 0 ? httpGet(index + 1) : EMPTY)), // other pages
    reduce((a, v) => [...a, ...v], []), // optional if you want only one emit
  ).subscribe((x) => console.log(x));
Vincent GODIN
  • 467
  • 4
  • 8