5

I am having a use case where I need to limit the number of outgoing http requests. Yes, I do have rate limiter on the server-side but a limit on the number of active http requests is also need on the front end too.For that reason I am attempting to implement a sliding window protocol where at any single time I will only have n active requests.

This approach using Rxjs works fine in general, see here: https://jsbin.com/pacicubeci/1/edit?js,console,output

but I am not clear how to use the same logic with http interceptors. My attempt below fails at compile time with the following error:

Type 'Subscription' is missing the following properties from type 'Observable<HttpEvent>': _isScalar, source, operator, lift, and 114 more.(2740)

With that, how can I return an observable and maintain a queue at the http interceptor at the same time?Is my approach flawed? Can I use http interceptors to http rate limit at all?

@Injectable()
export class I1 implements HttpInterceptor {
  intercept(
    req: HttpRequest<any>,
    next: HttpHandler
  ): Observable<HttpEvent<any>> {
    const modified = req.clone({ setHeaders: { "Custom-Header-1": "1" } });

    return next
      .handle(req)
      .do((ev: HttpEvent<any>) => {
        if (ev instanceof HttpResponse) {
          console.log(ev);
        }
      })
      .pipe(
        bufferTime(1000, null, 1),
        filter(buffer => buffer.length > 0),
        concatMap(buffer => of(buffer).pipe(delay(1000)))
      )
      .subscribe(console.log);
      }
    }

https://stackblitz.com/edit/angular-interceptors-npqkjp?file=app/interceptors.ts

matcheek
  • 4,887
  • 9
  • 42
  • 73
  • You say *"active requests"* - browsers already limit this per host for HTTP 1.1, see e.g. https://stackoverflow.com/q/985431/3001761. – jonrsharpe Jan 18 '21 at 22:07
  • Yes, I am aware of that. Thinking http2 where I the number of connection2 could be a lot higher that those for http1.1 – matcheek Jan 18 '21 at 22:09
  • Does this answer your question? [How to limit API calls per second with angular2](https://stackoverflow.com/questions/42260300/how-to-limit-api-calls-per-second-with-angular2) – Eyeslandic Jan 19 '21 at 11:12
  • https://medium.com/leantaas-engineering/throttling-api-calls-in-angular-64b8c0e0e137 here are some ideas – Eyeslandic Jan 19 '21 at 11:14

2 Answers2

5

If you'd like to find out more about how interceptors and the HttpClientModule work under the hood, you could check out this article: Exploring the HttpClientModule in Angular.

Is my approach flawed? In this case, the problem is that next.handle is expected to return an Observable, but by subscribing to it, it returns a Subscription.

To get a better understanding of why, I will paste a snippet copied from the article linked above:

const obsBE$ = new Observable(obs => {
  timer(1000)
    .subscribe(() => {
      // console.log('%c [OBSERVABLE]', 'color: red;');

      obs.next({ response: { data: ['foo', 'bar'] } });

      // Stop receiving values!
      obs.complete();
    })

    return () => {
      console.warn("I've had enough values!");
    }
});

// Composing interceptors the chain
const obsI1$ = obsBE$
  .pipe(
    tap(() => console.log('%c [i1]', 'color: blue;')),
    map(r => ({ ...r, i1: 'intercepted by i1!' }))
  );

let retryCnt = 0;
const obsI2$ = obsI1$
  .pipe(
    tap(() => console.log('%c [i2]', 'color: green;')),
    map(r => { 
      if (++retryCnt <=3) {
        throw new Error('err!') 
      }

      return r;
    }),
    catchError((err, caught) => {
      return getRefreshToken()
        .pipe(
          switchMap(() => /* obsI2$ */caught),
        )
    })
  );

const obsI3$ = obsI2$
  .pipe(
    tap(() => console.log('%c [i3]', 'color: orange;')),
    map(r => ({ ...r, i3: 'intercepted by i3!' }))
  );

function getRefreshToken () {
  return timer(1500)
    .pipe(q
      map(() => ({ token: 'TOKEN HERE' })),
    );
}

function get () {
  return obsI3$
}

get()
  .subscribe(console.log)

/* 
-->
[i1]
[i2]
I've had enough values!
[i1]
[i2]
I've had enough values!
[i1]
[i2]
I've had enough values!
[i1]
[i2]
[i3]
{
  "response": {
    "data": [
      "foo",
      "bar"
    ]
  },
  "i1": "intercepted by i1!",
  "i3": "intercepted by i3!"
}
I've had enough values!
*/

StackBlitz demo.

The gist is that interceptors create some sort of chain which ends with an observable that is responsible for making the actual request. This is the last node from the chain:

return new Observable((observer: Observer<HttpEvent<any>>) => {
  // Start by setting up the XHR object with request method, URL, and withCredentials flag.
  const xhr = this.xhrFactory.build();
  xhr.open(req.method, req.urlWithParams);
  if (!!req.withCredentials) {
    xhr.withCredentials = true;
  }
  /* ... */
})

how can I return an observable and maintain a queue at the http interceptor at the same time

I think a way to solve this is to create an interceptor that will contain the queue logic and make its intercept method return an Observable, so that it can be subscribed to:

const queueSubject = new Subject<Observable>();

const pendingQueue$ = queueSubject.pipe(
  // using `mergeAll` because the Subject's `values` are Observables
  mergeAll(limit),
  share(),
);

intercept (req, next) {
  // `next.handle(req)` - it's fine to do this, no request will fire until the observable is subscribed
  queueSubject.next(
    next.handle(req)
      .pipe(
        // not interested in `Sent` events
        filter(ev => ev instanceof HttpResponse),

        filter(resp => resp.url === req.url),
      )
  );

  return pendingQueue$;
}

The filter operators were used because by using share, the responses will be sent to all subscribers. Imagine you're synchronously calling http.get 5 times, so 5 new subscribers for share's Subject, and the last one will receive its response, but the response of other requests as well. So use can use filter in order to give the request the right response, in this case by comparing the URL of the request(req.url) with the URL we got from the HttpResponse.url:

observer.next(new HttpResponse({
  body,
  headers,
  status,
  statusText,
  url: url || undefined,
}));

Link for the above snippet.


Now, why did we use share() ?

Let's see a simpler example first:

const s = new Subject();

const queue$ = s.pipe(
  mergeAll()
)

function intercept (req) {
  s.next(of(req));
  
  return queue$
}

// making request 1
intercept({ url: 'req 1' }).subscribe();

// making request 2
intercept({ url: 'req 2' }).subscribe();

// making request 3
intercept({ url: 'req 3' }).subscribe();

At this point, the Subject s should have 3 subscribers. This is because when you return queue, you return s.pipe(...) and when you subscribe to that, it's the same as doing:

s.pipe(/* ... */).subscribe()

so, that's why the subject will have 3 subscribers at the end.

Now let's examine the same snippet, but with share():

const queue$ = s.pipe(
  mergeAll(),
  share()
);

// making request 1
intercept({ url: 'req 1' }).subscribe();

// making request 2
intercept({ url: 'req 2' }).subscribe();

// making request 3
intercept({ url: 'req 3' }).subscribe();

After you subscribe to request 1, share will create a Subject instance and all the subsequent subscribers will belong to it, instead of belonging to the main Subject s. So, s will have only one subscriber. This will make sure that we implement the queue correctly, because although the Subject s has only one subscriber, it will still accept s.next() values, whose results will be passed along to the other subject(that one which comes from share()), which will eventually send the responses to all of its subscribers.

Andrei Gătej
  • 11,116
  • 1
  • 14
  • 31
  • Thank you for you an in-depth answer. I get the idea on how this should work. Loving the simplicity of it however I got type mismatches when I followed these guidelines. https://stackblitz.com/edit/slidiong-window-with-mergeall?devtoolsheight=33&file=app/request-queue.service.ts – matcheek Jan 19 '21 at 17:36
  • 1
    @matcheek it should compile successfully now: https://stackblitz.com/edit/slidiong-window-with-mergeall-shuehd?file=app%2Frequest-queue.service.ts – Andrei Gătej Jan 19 '21 at 18:07
  • Just one more question. Upon revisting this stackblits above I have noticed two things: 1. Unlike in the SO answer I cannot see 'subscribe' on the intercept call. 2. all http queries get the same response assigned. Or at least that what it seems to me. Would you mind either revisting your answer or the stackblitz? – matcheek Jan 20 '21 at 11:37
  • 1
    @matcheek in the answer `intercept()` is subscribed just for demonstration purposes. They are automatically subscribed in angular, so you don't have to worry about that. Regarding the second point, you're right, the logic responsible for assigning the responses should reside in the interceptor, not in the service. Let me know if it works so that I can update the answer. – Andrei Gătej Jan 20 '21 at 12:42
  • 1
    This should be given to new Stack Overflow subscribers as an example of what a great answer looks like. Thank you @AndreiGătej! – Ben Jan 30 '23 at 02:37
1

On your interceptor you are returning a subscription, not an Observable.

If you remove the line .subscribe(console.log) it should compile just fine. The subscription is done by the consumer.

If you want to console.log everything that gets emitted, use the tap(next => ...) operator

Edit - Hum, it solves the compilation error, but I'm not sure it will work as you'd want... I don't fully understand how interceptors work.

olivarra1
  • 3,269
  • 3
  • 23
  • 34
  • 1
    Thanks for your answer. Your change removes the compilation error but II am still puzzled whether or not it is possible to have sliding window on http interceptors. Interceptor function needs to return an Observable but sliding window only makes sense if and array or nested observables are return. Or at least this is my best take at Rxjs. – matcheek Jan 19 '21 at 10:57