47

Since i've started with angular2 i have setup my services to return Observable of T. In the service i would have the map() call, and components using these services would just use subscribe() to wait for the response. For these simple scenarios i didnt really need to dig in to rxjs so all was ok.

I now want to achieve the following: i am using Oauth2 authentication with refresh tokens. I want to build an api service that all other services will use, and that will transparently handle the refresh token when a 401 error is returned. So, in the case of a 401, i first fetch a new token from the OAuth2 endpoint, and then retry my request with the new token. Below is the code that works fine, with promises:

request(url: string, request: RequestOptionsArgs): Promise<Response> {
    var me = this;

    request.headers = request.headers || new Headers();
    var isSecureCall: boolean =  true; //url.toLowerCase().startsWith('https://');
    if (isSecureCall === true) {
        me.authService.setAuthorizationHeader(request.headers);
    }
    request.headers.append('Content-Type', 'application/json');
    request.headers.append('Accept', 'application/json');

    return this.http.request(url, request).toPromise()
        .catch(initialError => {
            if (initialError && initialError.status === 401 && isSecureCall === true) {
                // token might be expired, try to refresh token. 
                return me.authService.refreshAuthentication().then((authenticationResult:AuthenticationResult) => {
                    if (authenticationResult.IsAuthenticated == true) {
                        // retry with new token
                        me.authService.setAuthorizationHeader(request.headers);
                        return this.http.request(url, request).toPromise();
                    }
                    return <any>Promise.reject(initialError);
                });
            }
            else {
                return <any>Promise.reject(initialError);
            }
        });
}

In the code above, authService.refreshAuthentication() will fetch the new token and store it in localStorage. authService.setAuthorizationHeader will set the 'Authorization' header to previously updated token. If you look at the catch method, you'll see that it returns a promise (for the refresh token) that in its turns will eventually return another promise (for the actual 2nd try of the request).

I have attempted to do this without resorting to promises:

request(url: string, request: RequestOptionsArgs): Observable<Response> {
    var me = this;

    request.headers = request.headers || new Headers();
    var isSecureCall: boolean =  true; //url.toLowerCase().startsWith('https://');
    if (isSecureCall === true) {
        me.authService.setAuthorizationHeader(request.headers);
    }
    request.headers.append('Content-Type', 'application/json');
    request.headers.append('Accept', 'application/json');

    return this.http.request(url, request)
        .catch(initialError => {
            if (initialError && initialError.status === 401 && isSecureCall === true) {
                // token might be expired, try to refresh token
                return me.authService.refreshAuthenticationObservable().map((authenticationResult:AuthenticationResult) => {
                    if (authenticationResult.IsAuthenticated == true) {
                        // retry with new token
                        me.authService.setAuthorizationHeader(request.headers);
                        return this.http.request(url, request);
                    }
                    return Observable.throw(initialError);
                });
            }
            else {
                return Observable.throw(initialError);
            }
        });
}

The code above does not do what i expect: in the case of a 200 response, it properly returns the response. However, if it catches the 401, it will successfully retrieve the new token, but the subscribe wil eventually retrieve an observable instead of the response. Im guessing this is the unexecuted Observable that should do the retry.

I realize that translating the promise way of working onto the rxjs library is probably not the best way to go, but i havent been able to grasp the "everything is a stream" thing. I have tried a few other solutions involving flatmap, retryWhen etc ... but didnt get far, so some help is appreciated.

Davy
  • 6,295
  • 5
  • 27
  • 38

3 Answers3

25

From a quick look at your code I would say that your problem seems to be that you are not flattening the Observable that is returned from the refresh service.

The catch operator expects that you will return an Observable that it will concatenate onto the end of the failed Observable so that the down stream Observer doesn't know the difference.

In the non-401 case you are doing this correctly by returning an Observable that rethrows the initial error. However in the refresh case you are returning an Observable the produces more Observables instead of single values.

I would suggest you change the refresh logic to be:

    return me.authService
             .refreshAuthenticationObservable()
             //Use flatMap instead of map
             .flatMap((authenticationResult:AuthenticationResult) => {
                   if (authenticationResult.IsAuthenticated == true) {
                     // retry with new token
                     me.authService.setAuthorizationHeader(request.headers);
                     return this.http.request(url, request);
                   }
                   return Observable.throw(initialError);
    });

flatMap will convert the intermediate Observables into a single stream.

paulpdaniels
  • 18,395
  • 2
  • 51
  • 55
  • Ha, this works :) I've tried using the flatMap method, but apparently not in the correct way. Thanx Paul! Ps: what i do find quite messy in rxjs is that they tend to hide these methods in files with different names. For the flatMap method to work, i had to import the "mergeMap" file ... – Davy Jan 21 '16 at 16:28
  • @Davy May I ask which version you are using? If it is the project from ReactiveX it is still currently in beta so the documentation is still a little lacking while all the functionality is being completed. – paulpdaniels Jan 21 '16 at 16:57
  • 3
    I have been following this solution in combination of extending http, but am running into an issue where I run multiple GET calls, they all return unauthorized, and then they all try to refresh the tokens simultaneously. The first call works, but all the other calls are trying to the same with the same refresh token, resulting in backend error. What is the solution with observables to accomplish "one GET call was unauthorized, is already getting new tokens, so wait for that to return before attempting to get new tokens"? Ideally refresh token would be called once and then other get calls use – Kevin Quiring Mar 22 '17 at 18:09
  • @paulpdaniels As stated in another answer, `flatMap` has been renamed to `mergeMap` – C.Champagne Mar 16 '20 at 15:31
  • @C.Champagne `flatMap` is just an alias of `mergeMap`, I tend to prefer the former, which is why I used it, but they are functionally equivalent. – paulpdaniels Mar 17 '20 at 11:00
12

In the latest release of RxJs, the flatMap operator has been renamed to mergeMap.

msanford
  • 11,803
  • 11
  • 66
  • 93
mostefaiamine
  • 201
  • 2
  • 5
2

I created this demo to figure out how to handle refresh token using rxjs. It does this:

  • Makes an API call with access token.
  • If access token expired (observable throws appropriate error), it make another async call to refresh the token.
  • Once token refreshed, it will retry the API call.
  • If still error, give up.

This demo does not make actual HTTP calls (it simulates them using Observable.create).

Instead, use it to learn how to use catchError and retry operators to fix a problem (access token failed the first time), then retry the failed operation (the API call).

kctang
  • 10,894
  • 8
  • 44
  • 63
  • thanks for the demo @kctang, I haven't tried it, but it seems interesting :) – nescafe Jun 10 '18 at 17:01
  • 1
    Very useful example, thank you! Do you have a similar example that uses queueing? So when a refresh is underway, queue all other requests until the new access token is available? The way it works now is every request will perform a refresh, but that isn't a good idea - only one refresh should be performed, and the other requests should be queued during that time... – lonix Jun 27 '19 at 09:18
  • that's a good point, @ionix. a bit closer to edge cases but still a valid scenario. Unfortunately i don't have an example on that for now. – kctang Jun 27 '19 at 16:41