3

I would like to repeat an API call which returns a Promise, conditionally using rxjs.

The API method receives an id which will be changed on every call by adding a counter prefix to it. The calls will be repeated until the data met some condition or the counter reach to a specific number X. How it can be done using rxjs?

API method:

fetchData(id):Promise<data>

try 1: fetchData(id)

try 2: fetchData(id_1)

try 3: fetchData(id_2)

Asi
  • 43
  • 5

5 Answers5

3

IMO, it's better to handle the polling either through Promises or RxJS without mixing them. I'd illustrate using RxJS.

Try the following

  1. Convert the promise to an observable using the RxJS from function.
  2. Use RxJS functions like timer or interval to regularly emit a value on a fixed interval.
  3. Use a higher order mapping operator like switchMap to map from the outer emission to your API call. Refer here for a brief description about different types of higher order mapping operators.
  4. Use two takeWhile operators, one for each of your condition respectively, to complete the subscription.
  5. Use filter operator to only forward the emissions that pass the condition.
import { from } from 'rxjs';

fetchData(id: any): Observable<any> {  // <-- return an observable
  return from(apiCall);                // <-- use `from` to convert Promise to Observable
}
import { timer } from 'rxjs';
import { filter, switchMap, takeWhile } from 'rxjs/operators';

timer(0, 5000).pipe(                        // <-- poll every 5 seconds
  takeWhile((index: number) => index < 20)  // <-- stop polling after 20 attempts
  switchMap((index: number) => 
    this.someService.apiCall(index+1)       // <-- first emission from `timer` is 0
  ),
  takeWhile(                                // <-- stop polling when a condition from the response is unmet
    (response: any) => response.someValue !== someOtherValue,
    true                                    // <-- emit the response that failed the test
  ),
  filter((response: any) => 
    response.someValue === someOtherValue   // <-- forward only emissions that pass the condition
  )
).subscribe({
  next: (response: any) => {
    // handle response
  },
  error: (error: any) => {
    // handle error
  }
});

Edit: The condition in the 2nd takeWhile was doing the opposite of the requirement. I've adjusted the condition and included the inclusive=true argument. Thanks @Siddhant in the comments.

ruth
  • 29,535
  • 4
  • 30
  • 57
  • The 2nd `takeWhile` won't work as per the comment mentioned besides it. The polling will stop the moment `response.someValue === someOtherValue` evaluates to `false` and not `true`. Also the use of `filter` operator is redundant. – Siddhant Feb 15 '22 at 16:09
  • Thanks for your reply @ruth. There problem with using the timer is that the response from the first call may takes longer than time interval and it will send the next request before evaluating the result from previous call. – Asi Feb 15 '22 at 18:27
  • @Siddhant: Thanks for the input about the 2nd `takeWhile`. I've adjusted the condition and included the `inclusive=true` argument. About the `filter`, without it, the subscription callbacks would be triggered for all the emissions from the API call. OP didn't mention if it's the required behavior. So I wouldn't ignore the `filter` unless the subscription callbacks are also prepared to handle the API responses that doesn't pass the certain OP's condition. – ruth Feb 16 '22 at 08:55
  • @Elham: In that case you could try the `concatMap` instead of `switchMap`. Please go through the attached answer to get the reasoning behind it. `concatMap` would ensure all the API calls are triggered serially while `switchMap` would cancel the current inner subscription when the `timer` emits. – ruth Feb 16 '22 at 08:58
  • @ruth Previously both `takeWhile` and `filter` had same `response.someValue === someOtherValue` condition, so `takeWhile` wouldn't have emitted value when condition wasn't met, and hence `filter` with same condition didn't seem to be of any use. With the new update `filter` will serve a purpose if OP has use case on it. – Siddhant Feb 16 '22 at 09:46
  • @Siddhant: `takeWhile` doesn't have any bearing on whether the value would be emitted or not similar to `filter`. The value would be emitted regardless of the condition. It only closes the subscription when the condition fails. Example: https://stackblitz.com/edit/rxjs-91wukh?file=index.ts – ruth Feb 16 '22 at 09:50
  • @ruth Remove `inclusive=true` and check the output. After that try adding `filter` right after `takeWhile` with same condition `value => value < 5` and without `inclusive=true` argument and hopefully you will get an idea what I was trying to convey. Even if I fail to convey the point, that should be fine. Thanks :) – Siddhant Feb 16 '22 at 10:42
  • @Siddhant: The `filter` condition you're proposing `value => value < 5` is of course redundant. But in analogy to OP's question, the condition would be something like `value => value === 5`. In which case the only notification that would be emitted is `5`. – ruth Feb 16 '22 at 10:53
  • Will it start fetching API again if the condition of first `takeWhile` becomes true later after once becoming false? – IsmailS Jun 16 '23 at 12:29
  • @IsmailS: No, `takeWhile` closes the subscription. The subscription must be triggered again to restart it. If you wish to manually control the polling instead of `takeWhile`, you could replace it with `swtichMap` and return RxJS constants [`NEVER`](https://rxjs.dev/api/index/const/NEVER) or [`EMPTY`](https://rxjs.dev/api/index/const/EMPTY) based on your condition. – ruth Jun 16 '23 at 14:42
1

You can use concatMap to ensure only one call is tried at a time. range gives the maximum number of calls because takeWhile will unsubscribe early (before the range is done) if a condition is/isn't met.

That might look like this:

// the data met some condition
function metCondition(data){
  if(data/*something*/){
    return true;
  } else {
    return false
  }
}

// the counter reach to a specific number X
const x = 30;

range(0, x).pipe(
  concatMap(v => fetchData(`id_${v === 0 ? '' : v}`)),
  takeWhile(v => !metCondition(v))
).subscribe(datum => {
  /* Do something with your data? */
});
Mrk Sef
  • 7,557
  • 1
  • 9
  • 21
0

You could try retryWhen:

let counter=0;

const example = of(1).pipe(
  switchMap(x => of(counter)), // Replace of() with from(fetchData('id_'+counter))
  map(val => {
    if (val < 5) {
      counter++;
      // error will be picked up by retryWhen
      throw val;
    }
    return val;
  }),
  retryWhen(errors =>
    errors.pipe(
      // log error message
      tap(val => console.log(`Response was missing something`)),
    )
  )
);

It's not ideal as it needs a counter in the outer scope, but until there is a better solution (especially without time based retries) this should work.

robmcm
  • 891
  • 11
  • 22
0

I know you've specified using rxjs, however you've also specified that fetchData() returns a promise and not an observable. In this case I would suggest using async and await rather than rxjs.

  async retryFetch() {
    let counter = 0;
    while (counter++ < 20 && !this.data) {
      this.data = await this.fetchData(counter);
    }
  }

You can put whatever you want in the conditional.

Even if your api call returned an observable, I would still suggest wrapping it in a promise and using this very readable solution.

The stackblitz below wraps a standard http.get with a promise and implements the above function. The promise will randomly return the data or undefined.

https://stackblitz.com/edit/angular-ivy-rflclt?file=src/app/app.component.ts

Chris Hamilton
  • 9,252
  • 1
  • 9
  • 26
-3
let count = 0
const timerId = setTimout( () =>{
   if(count){
      fetchData(`id_${count}`)
   }else{
      fetchData('id')
   }
   count = count + 1
,60000}) //runs every 60000 milliseconds

const stopTimer = () =>{ //call this to stop timer
    clearTimeout(timerId);
}
N S Niko
  • 77
  • 2