4

I'd basically like to implement an observable pool consumer.

So, a service makes an http call to retrieve a number of results and then a component is consuming those results one by one. When the service detects that is running out of results it will make a call again to the server for a number of new results.

I started trying to make single http calls and subscribe and unsubscribe every time, but it seems quite messy and confusing.

I'd like something like this:

this.http
  .post(API, postData)
  .map((response: Response) => response.json())
  .subscribe(data => this.saveData(data))
  .repeat(when_certain_condition)

Is there any way to achieve this? I read about repeat but it seems like you need to provide the value to be repeated, so it doesn´t seem like the way to go.

Any ideas? Thanks

martin
  • 93,354
  • 25
  • 191
  • 226
David
  • 3,364
  • 10
  • 41
  • 84
  • This is very dependent on what your response looks like and how exactly you consume the items. It seems like a very similar question is this http://stackoverflow.com/questions/40348748/rxjs-consume-api-output-and-re-query-when-cache-is-empty/40359622#40359622 – martin Jan 27 '17 at 09:20

2 Answers2

3

I faced the very same situation, and I solved with the following approach.

this.http
  .post(API, postData)
  .map((response: Response) => response.json())
  .delay(2000)
  .flatMap(data => {
    this.saveData(data);
    if (when_certain_condition) {
      return Observable.throw('retry');
    } else {
      return Observable.of(data);
    }
  }).retry().subscribe();

Explanation:

  1. You need to use a flatMap() to transform the items emitted by an Observable into Observables
  2. You need to use Observable.throw() to force an error, in this way the retry() will be invoked
  3. I used a delay(2000), in this way I wait 2 seconds before retrying another call
Max
  • 1,989
  • 4
  • 17
  • 20
0

Ok, if doWhile is not available, then emulate it with retry:

this.http
  .post(API, postData)
  .map((response: Response) => response.json())
  .do(data => {
       this.saveData(data);
       if(data.length > 0){
           throw new Error();
       }
   })
  .retry()
  .subscribe();
kemsky
  • 14,727
  • 3
  • 32
  • 51
  • 1
    The `doWhile()` operator is no longer implemented. https://github.com/ReactiveX/rxjs/blob/master/MIGRATION.md#operators-renamed-or-removed – martin Jan 27 '17 at 09:34
  • Man... I was getting crazy trying to find it... :( – David Jan 27 '17 at 09:50
  • retry doesn´t seem to do what I want, it responds to errors I managed to get what I wanted but not with a single observable, here http://stackoverflow.com/questions/41891630/rxjs-observable-not-reaching-subscribe/41892103#41892103 – David Jan 27 '17 at 12:02