3

I have subscribed to timer that produces event every n seconds

Observable.interval(1000) .startWith(0) .map( x => { return 'USER'; }

i have also other observable, that produces results that are not available from the very beggining, it takes some time to resolve. The timeout events accumulate, and when the other event finally fires up I have a flood of requests.

 .zip(tokenService.token, (privilege: string, token: Token) => { 
       /*get request body*/ }
 .flatMap((body: any) => { /* perform refresh request */ }
 .map( x => { return x.json(); })
 .subscribe( json => {
       let token = json['token']'
       tokenService.setToken(token);
 });

Is there a way to keep only one last event from timer, and discard the rest?

.last() does not work for me, because it does return only one event, but then it returns nothing, i don't see next timeout events.

Maybe it is not a good angle for my problem? I want to refresh token every n seconds, and do that only if I have valid token on my hand (right now service providing Observable<Token>)

Edit: Ok, I found out this is called backpressure, and there is an article about it: https://github.com/ReactiveX/RxJava/wiki/Backpressure

Question still stands though.

charlie_pl
  • 2,898
  • 4
  • 25
  • 39
  • These are really difficult to help without a clear problem statement, a bin, and/or a marble diagram. Here's a Bin. Using the Zip operator, there's no "flood of requests." Please be more specific. http://jsbin.com/sezanam/edit?js,console – D. Walsh Dec 02 '16 at 16:24

1 Answers1

1

You basically want to re-trigger an event based on the current state of an Observable. In case the token is valid and some time has passed a new token should be created, otherwise nothing should happen.

See the following example on jsbin that provides some sample code of how this can be done. Use the button to create a new valid token. From then on every second a new token will be generated (which is valid for five seconds). Once you invalidate the token via the invalidate button the generation of new tokens will stop.

function createToken() {
  // Here you would do some (async) things to get a valid token.
  return Promise.resolve({ validUntil: new Date(+new Date() + 5000) });
}

function isTokenValid(token) {
  const date = new Date();
  return token.validUntil > date;
}

// Subject holds copy of latest token.
const token$ = new Rx.ReplaySubject(1);

// Defines the time interval to periodically query new tokens.
const period$ = Rx.Observable.interval(1000);

period$
  .withLatestFrom(token$, (p, token) => {
    if (isTokenValid(token)) {
      createToken()
        .then(token => token$.next(token));
    }
    return token;
  })
  .filter(token => isTokenValid(token))
  .subscribe(x => console.log(x));

// Button stream that invalidates token
const invalidateBtn$ = Rx.Observable.fromEvent(
  document.getElementById('invalidateBtn'), 'click')
  .subscribe(() => {
    token$.next({ validUntil: new Date(0) });
  });

// Button stream triggers creation of first valid token
const createBtn$ = Rx.Observable.fromEvent(
  document.getElementById('createBtn'), 'click')
  .subscribe(() => {
    createToken()
      .then((token) => token$.next(token));
  });
dotcs
  • 2,286
  • 1
  • 18
  • 26
  • zip was a bad idea, with latestFrom + filter seems to fix all my problems. I wonder how I didn't find it, I thought I've tried everything. It seems there is even cooler way to do this though: http://stackoverflow.com/questions/28945061/how-can-i-clear-the-buffer-on-a-replaysubject ReplaySubject can expire token automatically, and also store only one token inside. Thousand thanks for your invaluable help! – charlie_pl Dec 02 '16 at 20:29
  • 1
    Glad that it worked out for you. Keep smiling, observables are sometimes hard to wrap one's head around. But they're so powerful! :) – dotcs Dec 02 '16 at 20:33