1

I need to build a stream that makes api request immediately after the call and that each hour (1:00 PM, 2:00 PM) if page wasn't refreshed. I build this with setTimeout() but I want to implement it with RxJs. Can you help me pls.

isUpdated() {
    return new Observable<Object>(function update(obs) {
        this.http.get(`/update`).subscribe(data => {
            obs.next(data);
        });
        const timer = (60 - new Date().getMinutes()) * 60 * 1000;
        setTimeout(() => update.call(this, obs), timer);
    }.bind(this));
}

//call
isUpdated().subscribe(data => console.log(data));
nothing
  • 49
  • 2
  • 8

5 Answers5

1

Maybe you could do it like this (I didn't test this code):

import { merge, concat, of, timer, interval } from 'rxjs';
import { switchMap } from 'rxjs/operators';

...

isUpdated() {
  return concat(
    merge(
      of(null), // make call inmediately
      timer((60 - new Date().getMinutes()) * 60 * 1000), // the next full hour eg. 2:00
    ),
    interval(60 * 60 * 1000), // every hour
  ).pipe(
    switchMap(() => this.http.get(...)),
  )
})

concat will subscribe to interval only after the first merge completes which is at the next full hour eg.: 2:00 and then emit every hour.

@MrkSef had a good idea that this could be simplified to something like this:

isUpdated() {
  return timer(
    (60 - new Date().getMinutes()) * 60 * 1000, // the next full hour eg. 2:00
    60 * 60 * 1000 // every hour
  ).pipe(
    startWith(null), // initial request
    switchMap(() => this.http.get(...)),
  )
})
marc_s
  • 732,580
  • 175
  • 1,330
  • 1,459
martin
  • 93,354
  • 25
  • 191
  • 226
1

I think that to solve this problem, you need to break it down into smaller pieces.

First of all, we know that at some point, based on the current time, we'll want to know when to trigger the next call. If we get a timestamp which gives us the current time in ms and we want to get the number of ms before the next hour, here's how we can do it:

const timeToNextHourInMs = (currentTimestampMs) => {
  const timestampSeconds = currentTimestampMs / 1000;

  const numberOfSecondsIntoTheCurrentHour = timestampSeconds % 3600;

  const numberOfSecondsToTheNextHour = 3600 - numberOfSecondsIntoTheCurrentHour;

  return numberOfSecondsToTheNextHour * 1000;
};

I hope that the variable names are explicit enough that I do not need to comment but let me know otherwise.

Next, we want to tackle the stream issue:

  • We want to trigger an HTTP call straight away
  • Get the emitted value straight away
  • Do all the above again every time a new hour start (1:00, 2:00, 3:00, etc..)

Here's how you can do this:

this.http.get(`/update`).pipe(
  timestamp(),
  switchMap(({ timestamp, value }) =>
    concat(
      of(value),
      EMPTY.pipe(delay(timeToNextHourInMs(timestamp)))
    )
  ),
  repeat()
);

Let's go through the above logic:

  • First, we make the HTTP call straight away
  • Once the HTTP call is done, we get the current timestamp (to later on based on that find out when we want to do the next call)
  • We do a switchMap but as our HTTP call is only ever going to return 1 value it doesn't really matter in this very specific case. We could use flatMap or concatMap too
  • Inside the switchMap, we use concat to first of all send the value that we just got from the HTTP call but also keep that observable alive until the end of the current our (by using the function we created earlier)
  • At the end of the current hour, the stream will therefore complete. BUT, as we've got a retry, as soon as the stream completes, we'll subscribe to it again (and as a reminder, the stream will only complete at the very beginning of a new hour!)

One thing I'd suggest to add here but which isn't a requirement of the initial issue would be to have some error handling so that if something goes wrong when you make that call, it automatically retries it a few seconds after. Otherwise imagine when the polling kicks in if your network doesn't work for 5s at that exact time your stream will error straight away.

For this, you can refer to this brilliant answer and do that in a reusable custom operator:

const RETRY_DELAY = 2000;
const MAX_RETRY_FOR_ONE_HTTP_CALL = 3;

const automaticRetry = () => (obs$) =>
  obs$.pipe(
    retryWhen((error$) =>
      error$.pipe(
        concatMap((error, index) =>
          iif(
            () => index >= MAX_RETRY_FOR_ONE_HTTP_CALL,
            throwError(error),
            of(error).pipe(delay(RETRY_DELAY))
          )
        )
      )
    )
  );

This will retry the observable 3 times with a delay between each retry. After 3 times, the stream will error by throwing the last emitted error.

Now, we can just add this custom operator to our stream:

this.http.get(`/update`).pipe(
  automaticRetry(),
  timestamp(),
  switchMap(({ timestamp, value }) =>
    concat(
      of(value),
      EMPTY.pipe(delay(timeToNextHourInMs(timestamp)))
    )
  ),
  repeat()
);

I haven't actually tested the code above so please do that on your side and let me know how it goes. But if my logic is correct here's how things should go:

  • Imagine you start your app at 2:40
  • An HTTP call is made straight away
  • You get the response pretty much instantly
  • The stream is set to be kept open for 20mn
  • At 3:00, the stream is completed and the retry kicks in: We do another HTTP call
  • This time, the server got re-deployed and was not available for a few seconds
  • Internally, the stream errors but thanks to our custom operator automaticRetry it waits for 3 seconds then retries the HTTP call once, still nothing. It waits another 3 seconds and this time it's fine, the result is passed downstream
  • Repeat this indefinitely :)

Let me know how it goes

maxime1992
  • 22,502
  • 10
  • 80
  • 121
1

I think it is very simple.

Timer accepts two parameters, the first is the delay for the first call and then the interval in which the call should be made. You got the next full hour offset right, I'm using your code for that.

Use startWith(null) to start immediately.

const startDelayMs = (60 - new Date().getMinutes()) * 60 * 1000;
const hourMs = 60 * 60 * 1000;

timer(startDelayMs, hourMs).pipe(
   startWith(null),
   switchMap(()) // do your call here
)
kvetis
  • 6,682
  • 1
  • 28
  • 48
0

You can simply use rxjs interval

import { interval } from 'rxjs';

const seconds = 3600;
const source = interval(seconds * 1000);

source.subscribe(()=> {
  // make api request here
});
Armen Stepanyan
  • 1,618
  • 13
  • 29
  • 1
    Your example will emit every 60 minutes, but I need to calculate how minutes left for a new hour – nothing Dec 03 '20 at 10:36
0

Ok, so you want to:

  1. create two streams: first one will always tick every hour (60 000 ms), second - will control the allowance of emitting these hourly ticks
const every60Minutes$ = interval(60 * 1000);
const isAllowedToProceedSubject = new Subject<boolean>(); // subject is also an Observable stream
  1. combine those two streams and only allow to emit the values if the second stream has "true" as the latest value
const every60MinsIfAllowed$ = combineLatest([
  isAllowedToProceedSubject,
  every60Minutes$
]).pipe(
  filter(([isAllowed, intervalTick]) => {
    return isAllowed;
  }),
  flatMap(() => {
    return of(new Date()); // replace with HTTP call
  })
)
  1. be able to control the second stream
setTimeout(() => {
  isAllowedToProceedSubject.next(true); // this allows the values to be emitted
}, 2500);

// or add some click handler

Working example (but with emitting every 2000ms for better clarity)

Yevhenii Dovhaniuk
  • 1,073
  • 5
  • 11