0

I was researching how to do multiple http calls in Angular 10 that didn't require you to write a bunch of calls and subscribes that makes the code unnecessarily long. Stumbled across the rxjs forkjoin. Code is simple, just 4 https calls that I do stuff with when they return

const observableCall$ = this._configService.getSingle( "Param1", 'Param2' ).pipe( catchError( err => of( err ) ) );
const observableCall$ = this._configService.getSingle( "Param1", 'Param2' ).pipe( catchError( err => of( err ) ) );
const observableCall$ = this._configService.getSingle( "Param1", "Param2" ).pipe( catchError( err => of( err ) ) );
const observableCall$ = this._configService.getSingle( "Param1", "Param2" ).pipe( catchError( err => of( err ) ) );
    forkJoin(
      [
        observableCall$,
        observableCall$,
        observableCall$,
        observableCall$,
      ]
    ).subscribe( ( results: Config[] ) => {
      //do stuff
    } )

The problem I'm running in to is with expired tokens. If the token is expired, the call to get a new token happens 4 times, which is obviously unnecessary.

The code for the auth interceptor is this:

intercept( request: HttpRequest<unknown>, next: HttpHandler ): Observable<HttpEvent<unknown>> {
    const authToken = this._authentication.token;
    return ( !!authToken && ( request.url.indexOf( environment.authBaseUrl ) === -1 )
      ? next.handle( request.clone( {
        headers: request.headers
          .set( 'Authorization', `${authToken.token_type} ${authToken.access_token}` )
      } ) ) : next.handle( request ) ).pipe( catchError( ( error ) => {
        if ( error instanceof HttpErrorResponse && error.status === 401 ) {
          return this._authentication.exchangeRefreshToken()
            .pipe( mergeMap( token => next.handle( request.clone( {
              headers: request.headers.append( 'Authorization', `${token.token_type} ${token.access_token}` )
            } ) ) ) )
        }
      } ) )
  }

Code for exchanging refresh token:

const headers = this._httpHeaders;
    const body = new HttpParams()
      .append( 'grant_type', 'refresh_token' )
      .append( 'client_id', environment.clientId )
      .append( 'client_secret', environment.clientSecret )
      .append( 'refresh_token', this.token.refresh_token );

    var token = this._httpClient.post( environment.authBaseUrl, body, { headers } )
      .pipe( catchError( error => { return null; } ) ) as Observable<Token>;
    if ( !( token instanceof Token ) ) return this.getToken();
    else {
      this.token = new Token( token );
      return token;
    }

Now I think I understand the problem, which is the forkjoin is calling all 4 at the same time, they come through the interceptor as their own call and when the token is expired they all get a 401, therefore wanting a new token 4 times. Is there an elegant way to have it where it only asks for a new token once and then continue with the rest of the calls?

Edit: I also tried just having those calls in the html with async pipes. I.E:

<div *ngIf="observableCall$ | async">{{ observableCall | json }}</div>
    <div *ngIf="observableCall$ | async">{{ observableCall | json }}</div>

and the same thing happened. If there's a better way to do that I'm open for suggestions.

Thank you!

Edit(2): Code in auth interceptor thanks to Mark's help

import {
  HttpErrorResponse,
  HttpEvent,
  HttpHandler,
  HttpInterceptor,
  HttpRequest
} from '@angular/common/http';
import { Injectable } from '@angular/core';
import { merge } from 'lodash';
import { Observable, of, Subject } from 'rxjs';
import { catchError, exhaustMap, filter, first, mergeMap, share, shareReplay, startWith, switchMap, tap } from 'rxjs/operators';
import { Token } from 'src/app/models/token/token.model';
import { environment } from 'src/environments/environment';
import { AuthorizationService } from '../services/authorization/authorization.service';

@Injectable()
export class AuthorizationInterceptor implements HttpInterceptor {
  constructor( private _authentication: AuthorizationService ) { }
  private _exchangeToken$ = new Subject<boolean>();

  private _refreshToken$ = this._exchangeToken$.pipe(
    filter( x => x ),
    exhaustMap( () => this._authentication.exchangeRefreshToken() ),
    share()
  );

  private _refreshTokenCache$ = this._refreshToken$.pipe(
    startWith( null ),
    shareReplay( 1 )
  );

  exchangeRefreshToken( expiredToken: Token ): Observable<Token> {
    console.log( '2' );
    const exchange = () => {
      const startToken$ = of( true ).pipe(
        tap( x => this._exchangeToken$.next( x ) ),
        filter( _ => false )
      );
      return merge( startToken$, this._refreshToken$.pipe( first() ) );
    }

    return this._refreshTokenCache$.pipe(
      first(),
      switchMap( token =>
        token == null || token === expiredToken ?
          exchange() :
          of( token )
      )
    );
  }

  intercept( request: HttpRequest<unknown>, next: HttpHandler ): Observable<HttpEvent<unknown>> {
    const authToken = this._authentication.token;
    return ( !!authToken && ( request.url.indexOf( environment.authBaseUrl ) === -1 )
      ? next.handle( request.clone( {
        headers: request.headers
          .set( 'Authorization', `${authToken.token_type} ${authToken.access_token}` )
      } ) ) : next.handle( request ) ).pipe( catchError( ( error ) => {
        if ( error instanceof HttpErrorResponse && error.status === 401 ) {
          return this.exchangeRefreshToken( authToken )
            .pipe( mergeMap( token => next.handle( request.clone( {
              headers: request.headers.append( 'Authorization', `${token.token_type} ${token.access_token}` )
            } ) ) ) );
        }
      } ) );
  }
}
M.Chappell
  • 43
  • 2
  • 9
  • Have `this._authentication.exchangeRefreshToken()` cache a result that it hands back on future calls until the next token expires. Rinse and repeat. – Mrk Sef Nov 02 '20 at 20:12
  • The auth service gets called for as many http calls that happen concurrently. I'm trying to add logic in the service that if there is already a call to exchange the refresh token, don't attempt to exchange more. Struggling with having the other calls wait for the first call to exchange the refresh token, since those need to wait until the first call got a new token. Ideas? – M.Chappell Nov 03 '20 at 17:07

1 Answers1

1

You'll want to create a wrapper for this._authentication.exchangeRefreshToken() that is multicasted and doesn't repeat attempts to get the refresh token.

One solution is to create a subject that you hook into to create your calls for refresh tokens.

private _exchangeToken$ = new Subject<boolean>();

private _refreshToken$ = _exchangeToken$.pipe(
  filter(x => x),
  exhaustMap(_ => this._authentication.exchangeRefreshToken()),
  share()
);

function exchangeRefreshToken() : Observable<Token>{
  const startToken$ = of(true).pipe(
    tap(x => this._exchangeToken$.next(x)),
    filter(_ => false)
  );
  return merge(startToken$, this._refreshToken$.pipe(first()));
}

In this pattern, all you've done is ensured that this._authentication.exchangeRefreshToken(), is never called concurrently. If a second emission on _exchangeToken$ subject is created, it gets ignored and the caller will wait on the results of the first call. What's nice here is that you can swap out this._authentication.exchangeRefreshToken() with this.exchangeRefreshToken() and make no further changes

A better solution would be to cache the refreshed token and only get a new one if the one in the cache is expired. This is a bit more involved because you need a mechanism to know which tokens have expired. We can use shareReplay(1) to keep a cache of the most recent token. We will also need to pass in the expired token so that we have something to compare our cached token against.

private _exchangeToken$ = new Subject<boolean>();

private _refreshToken$ = _exchangeToken$.pipe(
  filter(x => x),
  exhaustMap(_ => this._authentication.exchangeRefreshToken()),
  share()
);

private _refreshTokenCache$ = _refreshToken$.pipe(
  startWith(null),
  shareReplay(1)
);

function exchangeRefreshToken(expiredToken) : Observable<Token>{
  const exchange = () => {
    const startToken$ = of(true).pipe(
      tap(x => this._exchangeToken$.next(x)),
      filter(_ => false)
    );
    return merge(startToken$, this._refreshToken$.pipe(first()));
  }      

  return this._refreshTokenCache$.pipe(
    first(),
    switchMap(token => 
      token == null || token === expiredToken ?
      exchange() :
      of(token)
    )
  );
}

This isn't perfect because the cached token might still be expired even though it happens to be newer than the expired token being passed in. What you really want is a function that can read the payload of a token and check if it's expired.

The issue with this is that how this might be done depends on the token. Is it a JWT? Custom? If it's encrypted, asking the server may be the only way. If you know how long a token lasts, you can store fresh tokens with your own timestamp and use that to decide when to refresh.

private _refreshTokenCache$ = _refreshToken$.pipe(
  map(token => ({
    token,
    timeStamp: Date.now()
  })),
  shareReplay(1)
);

Really, there isn't a one-size-fits-all solution to this.


An aside:

This might look somewhat strange:

of(true).pipe(
  tap(x => this._exchangeToken$.next(x)),
  filter(_ => false)
);

This is a stream that emits nothing and then completes. As a side effect though, it calls this._exchangeToken$.next(true).

So why not just call this._exchangeToken$.next(true) directly like this?

function exchangeRefreshToken() : Observable<Token>{
  this._exchangeToken$.next(true);
  return this._refreshToken$.pipe(first());
}

We do it this way so that we create this effect when subscribed to, rather than when created. So this should still work.

const exchange$ = this.exchangeRefreshToken();
setTimeout(() => { 
  exchange$.subscribe(token => console.log("I got a token: ", token))
}, 60 x 60 x 1000);

Here we get a stream, then wait an HOUR and subscribe for a refresh token. If we don't generate the token as part of the subscription, then we will have subscribed too late to get the token, and "I got a token" will never get printed to the console.


Related questions for more details/other approaches

  1. Angular 4 Interceptor retry requests after token refresh
  2. Angular: refresh token only once
  3. Handling refresh tokens using rxjs
Mrk Sef
  • 7,557
  • 1
  • 9
  • 21
  • Hey, thank you for spending time to code all this and find those previous answers; my googlefu must need some work because I didn't find those originally. I tried implementing the code you sent in. I was never able to get the code where you do the ```of(true).pipe(``` working in the exchangeRefreshToken function, but the straight up ```_exchangeToken$.next, return _refreshToken.pipe(first())``` worked. It seemed like the former code never even went in to the pipe part of it. – M.Chappell Nov 04 '20 at 15:16
  • Strange, it works for me. I did notice that I didn't initialize the token cache with a value. So a comparison to the cached value just locks this all up. I updated my answer to do that. It's only 3 extra lines of code. – Mrk Sef Nov 04 '20 at 16:43
  • Hm, still doesn't work. I'm stepping through the code trying to debug and it seems like it goes through the exchangeRefreshToken function just fine, but never hits the exchangeRefreshToken on the authorization service. I'm going to add my code into the bottom of my original post. Don't know if it makes a difference but I'm using Angular 10 and RxJS 6.6.0. Also to test this I have been deleting the token from the redis cache and refreshing the page, don't know if that makes a difference or not. – M.Chappell Nov 04 '20 at 19:24