3

I am making calls to the spotify api which returns an object like so:

{
  next: 'https://api.spotify.com/v1/me/tracks?offset=100&limit=50'
  items: [...]
}

where items are the result of the just-made network call and next is the url to use to get the next 'page' of results. What i would like to do is make an initial call with no offset, and then keep making network calls until next is null, meaning the user has no more items left to get. If there are no more items they send back null for next.

This seems possible but cant seem to figure out how to do it correctly. What I am looking for is something like this:

  readonly baseSpotifyUrl: string = 'https://api.spotify.com/v1';

constructor(private http: HttpClient) {}


public getTracks(url?: string): Observable<any> {
    return this.http.get(url && url?.length > 0 ? url : `${this.baseSpotifyUrl}/me/tracks?limit=50`);
  }

public getAllTracks(): Observable<any> {
    const tracks$: Subject<any> = new Subject();
    let next: string = '';
    this.getTracks(next)
      .subscribe({
        next: (res: any): void => {
          tracks$.next(res.items);
          next = res.next;
          // if res.next !== null, do this again now that we have set next to res.next
        },
        error: (err: any): void => {
          console.error(err);
        }
      });
    return tracks$;
  }

The idea here is that my component will call getAllTracks() and receive a subject and then new items will be continuously pushed through that subject until all the items have been retrieved. I cannot seem to figure out how to make a new network request when the previous one returns ONLY IF there are more items to get (res.next !== null)

EDIT-----------------------------------------------------------

This gets the job done but I feel that its trash:

  public getAllTracksSegment(itemsSubject: Subject<any>, nextSubject: Subject<string>, url?: string): void {
    this.http.get(url && url?.length > 0 ? url : `${this.baseSpotifyUrl}/me/tracks?limit=50`).subscribe({
      next: (res: any): void => {
        itemsSubject.next(res.items);
        nextSubject.next(res.next);
      }
    });
  }

  public getAllTracks(): Observable<any> {
    const tracks$: Subject<any> = new Subject();
    const next$: Subject<string> = new Subject();
    next$.subscribe({
      next: (next: any): void => {
        if (next !== null) {
          this.getAllTracksSegment(tracks$, next$, next);
        }
      }
    });
    next$.next('');
    return tracks$;
  }
skyleguy
  • 979
  • 3
  • 19
  • 35
  • have tried takeWhile operator? – Aakash Garg Aug 06 '21 at 20:34
  • https://stackoverflow.com/questions/67487848/traversing-sequential-api-calls/67494088#67494088 – eko Aug 06 '21 at 23:06
  • @AakashGarg the http request observables close after one emission so takeWhile didnt seem to be working for them – skyleguy Aug 07 '21 at 00:41
  • @eko i seem to be getting closer with the accepted answer from that question. Ill look into it more soon but seems promising. kind of seems like it isnt returning the first set of results but it is making the correct number of calls – skyleguy Aug 07 '21 at 00:48

4 Answers4

4

If I understand the issue right, I would use the expand operator to build a solution.

Here the code I would use. Comments are inline

public getTracks(url?: string): Observable<any> {
    return this.http.get(url && url?.length > 0 ? url : `${this.baseSpotifyUrl}/me/tracks?limit=50`);
}

public getAllTracks(): Observable<any[]> {
  // the first call is with no parameter so that the default url with no offset is used
  return getTracks().pipe(
     // expand is used to call recursively getTracks until next is null
     expand(data => data.next === null ? EMPTY : getTracks(data.next)),
     // with tap you can see the result returned by each call
     tap(data => console.log(data)),
     // if you want you can use the reduce operator to eventually emit the 
     // accumulated array with all items
     reduce((acc, val) => {
       acc = [...acc, ...val.items]
       return acc
     }, [])
  )
}

// now you can fire the execution of the recursive calls by subscribing
// to the observable returned by getAllTracks
getAllTracks().subscribe(
   // allItems is an array containing all the items returned by the various calls
   allItems => console.log(allItems)
)

ADDITIONAL EXPLANATIONS after the comments of @skyleguy

The tap operator is used to implement side effects. In other words it receives all the notifications from the upstream, does whatever it needs to do with the data notified and then passes downstream the same notification. There is no need to return anything from the function passed to the tap operator. The upstream is just passed downstream after the side effect is applied. In this example the side effect is simply the print on the console of the data passed with the notification.

The reduce used within the pipe is the reduce operator of RxJs and not the reduce method of Array. The reduce RxJs operator accumulates all the data notified from upstream and emits only one value when upstream completes. So, in this example, every time the call to the remote function returns something, this something enters the reduce operator and contributes to the accumulation logic. When expand returns the EMPTY Observable, at the end of the recursion, the EMPTY Observable just completes without notifying anything, which means that upstream completes and therefore reduce can emit its first and only notification, i.e. the array with all items accumulated, and then complete.

This stackblitz replicate this logic with a simulation of the remote call.

Picci
  • 16,775
  • 13
  • 70
  • 113
  • yeah this is pretty clean. I'd probably benefit from a slight explanation of how it works though. I removed tap because it actually wasnt working for me with that there, nothing was happening until it was removed (probably because i wasnt returning data). I get how expand is working to make all of the network calls but does it return an array to be used with the reduce function? seems weird because when i hover it it says it returns: OperatorFunction<...>. or does it hit the reduce function every time a response from expand comes back – skyleguy Aug 08 '21 at 01:21
  • it does seem that making the tap return the data made it work, so makes sense. – skyleguy Aug 08 '21 at 01:25
  • Also, how is EMPTY working? I at first expected it to break on ...val.items when the EMPTY observable hit that block but it doesnt ever seem to actually hit that. When EMPTY is returned from the expand does it effectively close the observable? – skyleguy Aug 08 '21 at 01:29
  • I have added some additional explanations to the response. I hope these answer your doubts. – Picci Aug 08 '21 at 08:30
0

I made up this solution using RxJs. I hope it helps, give me a feedback if it works.

expand do recursive searchs and takeWhile control to requests only when Next isn't null. Reduce group all your tracks in an array. When you subscribe you have all the tracks available.

  getAllTracks(): Observable<any> {
    return this.getAllTracksSegment().pipe(
      expand((res: any) => this.getAllTracksSegment(res.next)),
      takeWhile((res) => !!res.next),
      reduce((total, current) => total.concat(current.tracks), [])
    ); // All tracks available when you subscribe
  }

  getAllTracksSegment(url?: string): Observable<any> {
    return this.http.get(url && url?.length > 0 ? url : `${this.baseSpotifyUrl}/me/tracks?limit=50`);
  }

PS: If you wanna emit partial results, just remove the reduce operator and it will work

Jorge Mussato
  • 2,266
  • 2
  • 11
  • 19
  • 1
    I guess it won't work in its current shape, because 1. With `EMPTY` there's no emission so the `Array` will be empty. 2. it doesn't compile: you're missing a `return` statement in both functions (+ `getAllTracksSegment` shouldn't be void) and `res/current` callbacks are inferred as `unknown` and so Typescript isn't happy with `res.next`/`current.tracks`. But yes, if you fix these things, it's a nice way to solve this problem :) – developer033 Aug 06 '21 at 23:11
  • Thanks for the feedback. I just fixed it. In those cases you can also suggest an edit for the answer. I compiled with another code I was using, consuming another endpoint. Important is to try using this logic for the problem, probably its the best approach. – Jorge Mussato Aug 06 '21 at 23:33
  • 1
    Unfortunately I can't suggest edits anymore, my edits are applied automatically, so I try to avoid them whenever possible (unless it's a minor typo, broken url, etc.) because some people prefer to correct their code by themselves. – developer033 Aug 06 '21 at 23:39
  • hmm, not sure if i did something wrong but nothing seems to happen at all. im definitely subscribing to the observable being returned by this function but i dont even see any network requests being made – skyleguy Aug 07 '21 at 00:40
  • @skyleguy Yes, the reason for this is mentioned in my first comment above (about `EMPTY`). – developer033 Aug 07 '21 at 01:09
  • Try it with "of" or even making the first http request – Jorge Mussato Aug 07 '21 at 01:50
0

While other answers mention takeWhile(), I would like to use it with a more simple/declarative solution. One more thing, while I used Observable<any[]> (I'm assuming items is an array), I would strongly advise adding a type declaration for the object being returned from the API.

public getAllTracks$: Observable<any[]>;
private nextUrl = new BehaviorSubject<string>('https://api.spotify.com/v1/me/tracks?limit=50');


constructor(private http: HttpClient) {
    getAllTracks$ = this.nextUrl.pipe(
        mergeMap(url => this.http.get(url)),
        takeWhile(res => !!res.next),
        tap(res => this.nextUrl.next(res.next)),
        scan((items, res) => [...items, ...res.items], [] as any[])
    );
}

First we define a BehaviorSubject that will emit what URL to call for each request, starting with the initial value. Then we define our observable in the constructor. No additional properties or methods needed.

getAllTracks$ subscribes to the BehaviorSubject and immediately receives the first url (the string we included in its declaration).

We use mergeMap() so it will call all sequential API requests. (If we used switchMap(), any new URL received from our BehaviorSubject would cancel any in progress API call and would "switch" to the new URL value.) I also find this more straigtforward than using expand().

Then our takeWhile() checks if our response has a next property. If it does, we continue to the last two operators. If it doesn't, it emits a complete signal, causing all subscribers to unsub.

tap() is best used for external state updates. Because we have our next URL, we can send it to our BehaviorSubject, queueing the next API call.

Lastly, we use scan() to combine how this data is presented. The scan() operator is like reduce() but for observable emissions. On the first emit it declares a new array, and adds any subsequent API emits to its list.

Joshua McCarthy
  • 1,739
  • 1
  • 9
  • 6
0

While I'll love @picci's approved answer, I wanted to present a slightly adjusted version. This highlights that there are usually multiple ways to achieve the same thing with RxJS :).

public getTracks(url?: string): Observable<any> {
    return this.http
      .get(url?.length ? url : `${this.baseSpotifyUrl}/me/tracks?limit=50`)
      .pipe(
        takeWhile((data: any) => data.next !== null), // Crucial, to stop the recursive call when `next` is null
      );
  }

  public getAllTracks(): Observable<any[]> {
    // the first call is with no parameter so that the default url with no offset is used
    return getTracks().pipe(
      // expand is used to call recursively getTracks until next is null
      expand(data => this.getTracks(data.next)),
      // with tap you can see the result returned by each call
      tap(data => console.log(data)),
      // if you want you can use the reduce operator to eventually emit the
      // accumulated array with all items
      reduce((acc, val) => [...acc, ...val.items], []),
    );
  }

// now you can fire the execution of the recursive calls by subscribing
// to the observable returned by getAllTracks
this.getAllTracks().subscribe(
   // allItems is an array containing all the items returned by the various calls
   allItems => console.log(allItems)
)

The key is takeWhile. It will complete once the condition has fulfilled. Both answers work great, though :).

Here the adapted example https://stackblitz.com/edit/rxjs-tcbdue?file=index.ts