0

My component needs to check whether some app preferences are set before API requests are made. Right now I have set it up like this, where I keep my component's data updated on a timer of 2 minutes:

ngOnInit(): void {
    this.subscription = timer(0, 120 * 1000).subscribe(() => {
        this.shopService.getPreferencesAsObservable().subscribe(preferences => {
            if(preferences) {
                this.getInitialPendingSlotsOrders();
                this.getInitialPendingNoSlotsOrders();
            }
        });
    });
}

getInitialPendingSlotsOrders(){
    this.apiService.fetchShopOrders("status=PENDING&only_slots=true").subscribe(orders=> {
        /* do stuff with orders */
        /* it can happen that I need to get another page of data */
        if(orders.next_page){
            this.getNextSlotsSetOfPendingOrders();
        }
    });
}

getInitialPendingNoSlotsOrders(){
    this.apiService.fetchShopOrders("status=PENDING").subscribe(orders=> {
        /* do stuff with orders */
        /* it can happen that I need to get another page of data */
        if(orders.next_page){
            this.getNextNoSlotsSetOfPendingOrders();
        }
    });
}

getNextSlotsSetOfPendingOrders() { 
    this.apiService.fetchNextSetOfOrders(this.nextSlotsUrl).subscribe(nextSetOfOrders => {
        /* do stuff with next set of orders */
    })
}

getNextNoSlotsSetOfPendingOrders() { 
    this.apiService.fetchNextSetOfOrders(this.nextNoSlotsUrl).subscribe(nextSetOfOrders => {
        /* do stuff with next set of orders */
    })
}

I thought that this would work but I have reached a scenario where I see that some extra API calls are being made. I know this has something to do with chaining observables. What can I do to clean this up?

Thank you in advance.

Manuel Brás
  • 413
  • 7
  • 21

3 Answers3

8

You have multiple nested subscriptions. They lead to multiple open subscriptions which may never be closed. Instead you'd need to use various RxJS operators available to restrict it to a single subscription.

And seeing you need to trigger two requests in parallel, you could also use RxJS forkJoin function.

Refer here for a quick run down.

In short

  • switchMap operator to map from one observable to another
  • filter operator to continue the operator chain based on a condition
  • forkJoin to combine and trigger multiple observables in parallel

Try the following

ngOnInit(): void {
  this.subscription = timer(0, 120 * 1000).pipe(
    switchMap(() => this.shopService.getPreferencesAsObservable()),
    filter(preferences => !!preferences) // emit only if `preferences` is defined and truthy
    switchMap(() => 
      forkJoin({
        slots: getInitialPendingOrders(true),
        noSlots: getInitialPendingOrders(false)
      })
    )
  ).subscribe({
    next: ({ slots, noSlots }) => {
      // do stuff with orders from `slots` and `noSlots` responses
    },
    error: (error: any) => {
      // handle error
    }
  });
}

getInitialPendingOrders(slots: boolean): Observable<any> {
  return this.apiService.fetchShopOrders("status=PENDING" + slots ? "&only_slots=true" : '');
}

Update

As a rule of thumb, you should return the observable and subscribe only where it's response is required. In your case you could pipe in a switchMap to each argument of the forkJoin and return an observable conditionally. When you do not wish to return anything return RxJS constand EMPTY to emit results from the forkJoin. Note that forkJoin would only emit when all it's source observables complete.

ngOnInit(): void {
  this.subscription = timer(0, 120 * 1000).pipe(
    switchMap(() => this.shopService.getPreferencesAsObservable()),
    filter(preferences => !!preferences) // emit only if `preferences` is defined and truthy
    switchMap(() => 
      forkJoin({
        slots: getInitialPendingOrders(true).pipe(
          switchMap((orders: any) => {
            /* do stuff with orders */
            return orders.next_page ? this.getNextSlotsSetOfPendingOrders() : EMPTY;
          })
        ),
        noSlots: getInitialPendingOrders(false).pipe(
          switchMap((orders: any) => {
            /* do stuff with orders */
            return orders.next_page ? this.getNextNoSlotsSetOfPendingOrders() : EMPTY;
          })
        )
      })
    )
  ).subscribe({
    next: ({ slots, noSlots }) => {
      // do stuff with next set of orders from `slots` and `noSlots`
    },
    error: (error: any) => {
      // handle error
    }
  });
}

getInitialPendingOrders(slots: boolean): Observable<any> {
  return this.apiService.fetchShopOrders("status=PENDING" + !!slots ? "&only_slots=true" : '');
}

getNextSlotsSetOfPendingOrders(): Observable<any> { 
  return this.apiService.fetchNextSetOfOrders(this.nextSlotsUrl);
}

getNextNoSlotsSetOfPendingOrders(): Observable<any> { 
  return this.apiService.fetchNextSetOfOrders(this.nextNoSlotsUrl);
}
ruth
  • 29,535
  • 4
  • 30
  • 57
  • This seems exactly what I want! However, I forgot to add in the question that I have another possible layer of subscriptions (updated now)... Could you take a look and maybe update your answer? Thanks! – Manuel Brás Jun 09 '21 at 14:30
  • 1
    @ManuelBrás: Is everything called correctly? The function `getNextSlotsSetOfPendingOrders()` is never seem to be called. – ruth Jun 09 '21 at 14:32
  • My mistake! Updated the question now. – Manuel Brás Jun 09 '21 at 14:38
  • Awesome, I'm about to test this! Btw, if I wanted to initialize some component fields with some of the preferences I receive, where would I do this? Maybe a `tap()` into the preferences after the `filter()`? – Manuel Brás Jun 09 '21 at 17:44
  • 1
    @ManuelBrás: Yes you could pipe in a `tap` operator like `tap(preferences => this.someField = preferences),`. Either before or after the `filter` operator depends on your requirement. If you need all the preferences it could be before the filter. If you need `preferences` only passing a condition, it could be after the `filter` operator. – ruth Jun 09 '21 at 20:36
  • Awesome stuff, thank you. I marked your answer as correct in the meantime. Just one last thing: I wanted to be straight to the point in my question so I didn't mention that inside `getNextSlotsSetOfPendingOrders()` and `getNextNoSlotsSetOfPendingOrders()` it could also happen that I need to get another page of orders, therefore the method would be calling itself with the next page url. Would your solution still apply? – Manuel Brás Jun 09 '21 at 20:54
  • 1
    @ManuelBrás: Yes if you need to trigger another observable within the `getNextSlotsSetOfPendingOrders()` (and it's counterpart), you could do `getNextSlotsSetOfPendingOrders().pipe(switchMap(orders => /* return observable */))` and so on. – ruth Jun 10 '21 at 06:52
3

As a rule of thumb, never call subscribe inside subscribe (there are exceptions of course).

I'd recommend you read Is it good way to call subscribe inside subscribe? and have a look to the different operators (forkJoin, mergeMap, ...) RxJs offers to combine observables sequentially, in parallel... depending on your requirements.

Gaël J
  • 11,274
  • 4
  • 17
  • 32
  • Thanks! I'll take a look. As an Angular amateur I've been struggling getting started with RxJs. Do you have a reference on any tutorial or course specific to RxJs so I can learn it in depth? – Manuel Brás Jun 09 '21 at 21:02
1

This is a pretty open ended question as there are a lot of ways to chain observables. Combination operators on Learn RxJs is a great place to look up all of the different options.

After using rxjs for a while I've started getting into the habit of separating complex processes into const variables or readonly properties, and then having a separate observable that chains them together as kind. I find it makes things easier to maintain as a class grows. Plus, a lot of times a method was unnecessary as it was just setting up the same stream - execution only occurs when subscribing.

In the example below, after each timer execution the stream switches flow to the getPreferencesAsObservable() method which presumably returns an observable that emits once. After the emission, filer() is used only when valid preferences are returned. Finally forkJoin() is used to combine the last results from the two observables that get shop orders.

At one point I chose to use concatMap and at another switchMap. The former was chosen so that no emissions got skipped in case one emission was true and the subsequent was false. With concatMap the following observables would get executed, but with switchMap if the previous emission did not complete then it would get terminated. Really the choice is up to you based upon the behavior you desire.

readonly noSlotsProcess$ = 
  this.apiService.fetchShopOrders("status=PENDING").pipe(
    tap((x) => /* do stuff */)
  );
readonly slotsProcess$ = 
  this.apiService.fetchShopOrders("status=PENDING&only_slots=true").pipe(
    tap((x) => /* do stuff */)
  );
readonly refresh$ = timer(0, 120 * 1000).pipe(
  concatMap(() => this.shopService.getPreferencesAsObservable()),
  filter((preferences => !!preferences),
  switchMap(() => { forkJoin( noSlots: this.noSlotsProcess$, slots: this.slotsProcess$ }))
);
ngOnInit(): void {
    this.subscription = this.refresh$.subscribe();
}

Daniel Gimenez
  • 18,530
  • 3
  • 50
  • 70
  • Even though @MichaelD's answer is working for me, this solution is very clean. I can defnitely take something from it as well, so thank you for taking the time! – Manuel Brás Jun 09 '21 at 23:58