7

I want to poll an endpoint no faster than once a second, and no slower than the time it takes to poll the endpoint. There should never be more than one request outstanding.

I want a reactive programming way to poll an endpoint at least once a second, but if the endpoint takes longer than 1 second, the next request fires immediately.

In the marble diagram below, the 2nd and 3rd requests take longer than 1 second, but the 4th and 5th requests finish quicker. The next request fires either on the 1 second boundary, or immediately upon obtaining the data from the last outstanding request.

s---s---s---s---s---s---| # 1 second interval observable
r---r----r--------r-r---| # endpoint begin polling events
-d-------d--------dd-d--| # endpoint data response events

I'm trying to get the terminology correct in the marble diagram, so I'm assuming that the beginning of the endpoint requests should be the marble I label "r", and the marble event I label "d" has the endpoint data.

Here's how much code it took me to do this in plain js; however, the subsequent requests do not fire immediately upon being obtained as I have asked above.

var poll;
var previousData;
var isPolling = false;
var dashboardUrl = 'gui/metrics/dashboard';
var intervalMs = updateServiceConfig.getIntervalInMilliSecondForCharts();

return {
    startInterval: startInterval,
    stopInterval: stopInterval
};

function startInterval() {
    stopInterval();
    tryPolling(); // immediately hit the dashboard
    // attempt polling at the interval
    poll = $interval(tryPolling, intervalMs);
}

/**
 * attempt polling as long as there is no in-flight request
 * once the in-flight request completes or fails, allow the next request to be processed
 */
function tryPolling() {
    if (!isPolling) {
        isPolling = true;

        getDashboard()
        // if the dashboard either returns successful or fails, reset the polling boolean
            .then(resetPolling, resetPolling);
    }
}

/** there's no longer an in-flight request, so reset the polling boolean */
function resetPolling() {
    isPolling = false;
}

function stopInterval() {
    if (poll) {
        $interval.cancel(poll);
        poll = undefined;
    }
}

function getDashboard() {
    return restfulService.get(dashboardUrl)
        .then(updateDashboard);
}

function updateDashboard(data) {
    if (!utils.deepEqual(data, previousData)) {
        previousData = angular.copy(data);
        $rootScope.$broadcast('$dashboardLoaded', data);
    }
}
BinaryButterfly
  • 18,137
  • 13
  • 50
  • 91
activedecay
  • 10,129
  • 5
  • 47
  • 71

3 Answers3

5

Here is my solution. It uses an internal subject, combineLatest and filter to ensure that requests don't accumulate if the responses are slower to arrive than the timer period.

The comments should explain how it works.

const delays = [100, 2000, 100, 3000];
const since = Date.now();
let index = 0;

function mock() {
    return Rx.Observable
    .of("res")
    .do(() => console.log("mock req at ", Date.now() - since, " ms"))
    .delay(delays[index++ % delays.length])
    .do(() => console.log("mock res at ", Date.now() - since, " ms"));
}

function poll() {

  return Rx.Observable.defer(() => {

    // Use defer so that the internal subject is created for each
    // subscription.
    const subject = new Rx.BehaviorSubject({ tick: -1, pending: false });

    return Rx.Observable
    
      // Combine the timer and the subject's state.
      .combineLatest(
        Rx.Observable.timer(0, 1000).do(tick => console.log("tick", tick)),
        subject
      )

      // Filter out combinations in which either a more recent tick
      // has not occurred or a request is pending.
      .filter(([tick, state]) => (tick !== state.tick) && !state.pending)

      // Update the subject's state.
      .do(([tick]) => subject.next({ tick, pending: true }))
      
      // Make the request and use the result selector to combine
      // the tick and the response.
      .mergeMap(([tick]) => mock(), ([tick], resp) => [tick, resp])

      // Update the subject's state.
      .do(([tick]) => subject.next({ tick, pending: false }))
      
      // Map the response.
      .map(([tick, resp]) => resp);
  });
}

poll().take(delays.length).subscribe(r => console.log(r));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

It's just occurred to me that there is an operator that does exactly this: exhaustMap.

const delays = [100, 2000, 100, 3000];
const since = Date.now();
let index = 0;

function mock() {
  return Rx.Observable
    .of("res")
    .do(() => console.log("mock req at ", Date.now() - since, " ms"))
    .delay(delays[index++ % delays.length])
    .do(() => console.log("mock res at ", Date.now() - since, " ms"));
}

const poll = Rx.Observable
  .timer(0, 1000)
  .do(tick => console.log("tick", tick))
  .exhaustMap(() => mock());

poll.take(delays.length).subscribe(r => console.log(r));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
cartant
  • 57,105
  • 17
  • 163
  • 197
  • 2
    Updated the answer to use `exhaustMap` - with which, the solution is trivial. – cartant Jan 12 '18 at 13:08
  • I may be confused about the implementation, but my example application queues up pending requests without waiting for them to complete before queueing up the next request. https://codepen.io/activedecay/pen/EoRNJv – activedecay Jan 12 '18 at 18:39
  • oh! i fixed it. see the codepen. i needed to pass a function that returned a promise, instead of the promise itself – activedecay Jan 12 '18 at 20:55
  • 1
    here's a great article on mapping strategies, and should help understanding why exhaustMap is the correct solution here. https://blog.angular-university.io/rxjs-higher-order-mapping/ – activedecay Mar 31 '20 at 18:05
3

I believe this does what you want:

let counter = 0;
function apiCall() {
  const delay = Math.random() * 1000;
  const count = ++counter;
  return Rx.Observable.timer(delay).mapTo(count);
}

Rx.Observable.timer(0, 1000)
  .mergeMap(() => apiCall())
  .take(1)
  .repeat()
  .subscribe(x => { console.log(x); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>
  • timer(0, 1000): emits immediately and on a one second interval after that
  • mergeMap(...): switches to the observable returned by the api call. This will generate a new observable on each retry. If you don't want to create a new one on each retry then replace this with mergeMapTo(apiCall()).
  • take(1): forces the subscription to complete so the timer doesn't fire once the api has emitted
  • repeat(): start the sequence over when the api emits

So the call will be made immediately to the api. If it doesn't return within one second then another call will be made each second. Once there is a response from one of the api calls the timer will be canceled and the whole sequence started over. This will not cancel in-flight requests which I believe is inline with your intent.

EDIT: If a later request returns before a previous request then the previous request will be thrown out.

bygrace
  • 5,868
  • 1
  • 31
  • 58
  • @activedecay sorry, I realized that my previous expansion was a bit off. So to clarify `take(1)` will complete the steam behind it which will kill the `timer(0, 1000)` (main goal) and the observable returned from the API call if it was not completed already. Then replay will start the steam over and prevent the subscription from completing. – bygrace Jan 11 '18 at 22:20
  • there's an issue i found that when changing the random timeout to 5 seconds, it appears there are multiple api hits which aren't logged in your solution. https://codepen.io/activedecay/pen/WdJMPv?editors=1011 iin my real implementation, it appears to duplicate what i've discovered in my codepen test; outstanding requests are still pending while another request is queued up. – activedecay Jan 11 '18 at 22:27
  • This solution is definitely better than mine! But there's an important thing to notice here: Back pressure. If for any reason your API call is taking more than 1s, it'll start creating back pressure (the timer will keep going). If you do polling, I guess it's because you don't want to refresh the page for a long time and this might eventually be a problem. Once again, I don't have a better idea for now and this is the best option so far ;) – maxime1992 Jan 11 '18 at 22:31
  • I didn't see your comment @activedecay but this is what I was talking about... It's a tricky/interesting question I hope to come with a best answer :) – maxime1992 Jan 11 '18 at 22:32
  • the backpressure issue is clear to me now, thank you @maxime. – activedecay Jan 11 '18 at 22:34
  • I did answer to a similar question months ago: https://stackoverflow.com/a/42659054/2398593 but I wasn't fully happy with my code... Maybe it might help you but I'm sure there's a better solution **EDIT** nop it doesn't match your need either... =P – maxime1992 Jan 11 '18 at 22:34
  • 1
    @activedecay It does appear that if a later request returns before a previous request that the previous request is discarded. Maybe you could consider that a feature ;) Anyways I agree about back-pressure. If one request is slow then I expect the next one to be slow too in most cases. You may want to up your timer from 1 second to something higher (15) to avoid back-pressure but still handle abnormalities. Or use websockets ;) – bygrace Jan 12 '18 at 12:02
2

I did have to think about that for 15 mn before I came up with an answer based only on rxjs and without side effects (no variable assignation) AND without back pressure!

const { Observable } = Rx;

const mockHttpRequest = url =>
  Observable
    .of('ok')
    .do(x => console.log('fetching...'))
    .delay(250);

const poll = (httpRequest$, ms) => {
  const tick$ = Observable.timer(ms);

  return Observable
    .zip(httpRequest$, tick$)
    .repeat()
    .map(([httpResult]) => httpResult);
};

poll(mockHttpRequest('your-url-here'), 1000)
  .do(console.log)
  .subscribe();

Here's a working Plunkr: https://plnkr.co/edit/sZTjLedNCE64bgLNhnaS?p=preview

maxime1992
  • 22,502
  • 10
  • 80
  • 121
  • Since `.zip` waits for all observables to emit at least once, even if the http request completes in less than 1 second it will wait for `tick$` to emit (1 second delay). It looks like this would at most emit once per second. – bygrace Jan 11 '18 at 20:21
  • if bygrace is correct, it would not be a solution to my proposed question because the fastest possible response is preferred if it is available. – activedecay Jan 11 '18 at 21:28
  • this may be the one i end up picking, as it serves nearly exactly the same purpose as my plain-js solution, in many fewer lines. the backpressure issue cannot be ignored -- if the server is taking a long time to respond, i don't want to queue another request. this is more important to me than waiting less than 1 second to get the next datapoint. cheers mate! – activedecay Jan 11 '18 at 22:36
  • instead of mocking a request, can you change the answer to an actual implementation that would hit the resource, preferrable one using a promise? i can't seem to get it right. https://codepen.io/activedecay/pen/ppVLBL – activedecay Jan 11 '18 at 23:04
  • 1
    You can replace `mockHttpRequest('your-url-here')` by whatever you want even if you return a promise rxjs will handle it on his own ;) For example using angular you'd put `http.get('your-url-here')`. – maxime1992 Jan 11 '18 at 23:08