1

I have http take say 10 seconds to complete (sometimes it take 1 second to complete) and interval run every 5 seconds.

I want interval to stop and wait until the http is complete then continue.

This is the reproduce the problem:

codesandbox.io

  refresh$ = interval(5 * 1000).pipe(tap(() => this.load()));

  fakehttp() {
    return timer(10 * 1000).pipe(take(1));
  }

  load() {
    this.fakehttp().subscribe((r) => {
      console.log("data");
    });
  }

  ngOnInit() {
    this.refresh$.subscribe((r) => {
      console.log("refresh!");
    });
  }
Jon Sud
  • 10,211
  • 17
  • 76
  • 174
  • Is `timer`/`interval` an RxJS thing? – VLAZ May 17 '21 at 11:23
  • Yes. this is the reproduce the problem. I have http take say 10 seconds to complete (sometimes it take 1 second to complete) and interval run every 5 seconds. I want interval to stop and wait until the http is complete then continue. – Jon Sud May 17 '21 at 11:24
  • OK, just wanted to make sure I read the correct documentation. – VLAZ May 17 '21 at 11:25
  • @VLAZ, yes. why not? `import { timer } from 'rxjs'` – Jon Sud May 17 '21 at 11:25
  • I'm looking at the code in the question only. – VLAZ May 17 '21 at 11:25
  • 1
    I think interval is the wrong way. A response could trigger a new request with something like a debounce. –  May 17 '21 at 11:27
  • @jabaa I need to refresh the data every X seconds. what you suggest I do? – Jon Sud May 17 '21 at 11:28
  • Subscribe to your backend request, add a timeout and send a new request. –  May 17 '21 at 11:28
  • I looking for rxjs operators can help me with that. the code is much cleaner with those operators. also if I use rxjs then I chain functions instead of callbacks. also why using setTimeout and not interval/timer? I get it free also in rxjs – Jon Sud May 17 '21 at 11:31
  • _"why using setTimeout and not interval/timer?"_ Because with `setTimeout` you could solve the problem in 5 minutes. Probably you could use a timer for this but I don't know how –  May 17 '21 at 11:34
  • 1
    @jabaa it's not that much harder with RxJS. Same logic. Nicer outcome. I just need to read the docs because I've not used it a lot. – VLAZ May 17 '21 at 11:35

2 Answers2

1

Using tap to trigger the second observable results in multiple subscriptions.

Option 1

If I understand the question correctly, you're trying to re-trigger the HTTP request after 5 seconds AFTER the current request has emitted. In that case the time interval b/n the calls is dynamic. In that case you'd try to trigger the request manually after each notification using BehaviorSubject.

Try the following

import { timer, BehaviorSubject, Subject } from 'rxjs';
import { switchMap, takeUntil } from 'rxjs/operators';

export class Sample implements OnInit, OnDestroy {
  timerSrc = new BehaviorSubject<number>(0);  // <-- default time
  close = new Subject<any>();  // <-- use to close open observables

  ngOnInit() {
    this.timerSrc.asObservable().pipe(
      takeUntil(this.close),
      switchMap((time: number) => timer(time)),
      switchMap(() => this.httpRequest())
    ).subscribe({
      next: (response: any) => {
        // use response
        this.timerSrc.next(5000);  // <-- call again in 5 seconds
      },
      error: (error: any) => {
        // handle error
      }
    });
  }

  ngOnDestroy() {
    this.close.next();  // <-- close open subscription 
  }
}

Option 2

If you do not mind have a fixed timer of 5 seconds b/n each successive calls, you could pipe to a timer with 5 second interval using exhaustMap. It'd ignore incoming emissions until the inner observable (the HTTP request) has emitted.

Note that here there would be no guarantee that each successive would have a fixed time interval of 5 seconds. It might be anything b/n 0 - 5 seconds depending on the time taken for the HTTP request to emit.

import { timer, Subject } from 'rxjs';
import { exhaustMap } from 'rxjs/operators';

export class Sample implements OnInit, OnDestroy {
  close = new Subject<any>();  // <-- use to close open observables

  ngOnInit() {
    timer(0, 5000).pipe(
      takeUntil(this.close),
      exhaustMap(() => this.httpRequest())
    ).subscribe({
      next: (response: any) => {
        // use response
      },
      error: (error: any) => {
        // handle error
      }
    });
  }

  ngOnDestroy() {
    this.close.next();  // <-- close open subscription 
  }
}
ruth
  • 29,535
  • 4
  • 30
  • 57
0

You can create your own observable that you manually update. Then set up a recursive timeout() call in load(). That way you always process the result and then wait 10 seconds. You you will not end up with premature update calls:

refresh$ = new Rx.Subject();

fakehttp() {
  return timer(10 * 1000).pipe(take(1));
}

load() {
  this.fakehttp().subscribe((r) => {
    console.log("data");

    //push value to observable
    refresh$.next(r);
    //schedule the next execution
    timer(10 * 1000)
      .subscribe(() => this.load());
  });
}

ngOnInit() {
  this.refresh$.subscribe((r) => {
    console.log("refresh!");
  });

  //start the update cycle
  this.load();
}

The manual observable update approach comes from this answer of luisgabrial;

VLAZ
  • 26,331
  • 9
  • 49
  • 67
  • I'm looking at the RxJS docs for a possibly better way to make and update observables. – VLAZ May 17 '21 at 11:41
  • the problem with this solution is you call `load` function recursive and not release the last call so it's open in the stack and may lead to a memory leak. and what if I want just to load once? can I do it? I don't think so. you united the timer with the load functionality. – Shlomi Levi May 17 '21 at 11:47
  • @ShlomiLevi AFAIK, `timer` runs and finishes. So, the `.subscribe` callback is cleared from the stack. Is that not the case for RxJS? "*what if I want just to load once?*" not the case with OP's code. OP explicitly wants periodic refreshes. – VLAZ May 17 '21 at 11:50