8

I'm trying to use RxJS for a simple short poll. It needs to make a request once every delay seconds to the location path on the server, ending once one of two conditions are reached: either the callback isComplete(data) returns true or it has tried the server more than maxTries. Here's the basic code:

newShortPoll(path, maxTries, delay, isComplete) {
    return Observable.interval(delay)
    .take(maxTries)
    .flatMap((tryNumber) => http.get(path))
    .doWhile((data) => !isComplete(data));
  }

However, doWhile doesn't exist in RxJS 5.0, so the condition where it can only try the server maxTries works, thanks to the take() call, but the isComplete condition does not work. How can I make it so the observable will next() values until isComplete returns true, at which point it will next() that value and complete().

I should note that takeWhile() does not work for me here. It does not return the last value, which is actually the most important, since that's when we know it's done.

Thanks!

Colton Voege
  • 123
  • 1
  • 5

4 Answers4

3

We can create a utility function to create a second Observable that emits every item that the inner Observable emits; however, we will call the onCompleted function once our condition is met:

function takeUntilInclusive(inner$, predicate) {
    return Rx.Observable.create(observer => {
        var subscription = inner$.subscribe(item => {
            observer.onNext(item);

            if (predicate(item)) {
                observer.onCompleted();
            }
        }, observer.onError, observer.onCompleted);


        return () => {
            subscription.dispose();
        }
    });
}

And here's a quick snippet using our new utility method:

const inner$ = Rx.Observable.range(0, 4);
const data$ = takeUntilInclusive(inner$, (x) => x > 2);
data$.subscribe(x => console.log(x));

// >> 0
// >> 1
// >> 2
// >> 3

This answer is based off: RX Observable.TakeWhile checks condition BEFORE each element but I need to perform the check after

Community
  • 1
  • 1
Calvin Belden
  • 3,114
  • 1
  • 19
  • 21
  • Oddly, this solution seems to work about half the time. Sometimes the final batch of data makes it to the subscribers, sometimes it just doesn't. Any idea why? Switching to switchMap did not fix it either. – Colton Voege Mar 04 '16 at 23:07
  • This is not working properly because "onNext" is always called. The code inside the observable should look like: if (predicate(item)) { observer.cemplete(); } else { observer.next(item); } – Mário Kapusta Jun 18 '18 at 07:38
1

You can achieve this by using retry and first operators.

// helper observable that can return incomplete/complete data or fail.
var server = Rx.Observable.create(function (observer) {
  var x = Math.random();

  if(x < 0.1) {
    observer.next(true);
  } else if (x < 0.5) {
    observer.error("error");
  } else {
    observer.next(false);
  }
  observer.complete();

  return function () {
  };
});
   
function isComplete(data) {
  return data;
}
  
var delay = 1000;
Rx.Observable.interval(delay)
  .switchMap(() => {
    return server
      .do((data) => {
        console.log('Server returned ' + data);
      }, () => {
        console.log('Server threw');
      })
      .retry(3);
  })
  .first((data) => isComplete(data))
  .subscribe(() => {
    console.log('Got completed value');
  }, () => {
    console.log('Got error');
  });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
Sergey Sokolov
  • 2,709
  • 20
  • 31
0

It's an old question, but I also had to poll an endpoint and arrived at this question. Here's my own doWhile operator I ended up creating:

import { pipe, from } from 'rxjs';
import { switchMap, takeWhile, filter, map } from 'rxjs/operators';

export function doWhile<T>(shouldContinue: (a: T) => boolean) {
  return pipe(
    switchMap((data: T) => from([
      { data, continue: true },
      { data, continue: shouldContinue(data), exclude: true }
    ])),
    takeWhile(message => message.continue),
    filter(message => !message.exclude),
    map(message => message.data)
  );
}

It's a little weird, but it works for me so far. You could use it with the take like you were trying.

Michael Pearson
  • 584
  • 1
  • 4
  • 10
0

i was googling to find a do while behavior, i found this question. and then i found out that doWhile takes in a second param inclusive boolean. so maybe you can do?:

takeWhile((data) => !isComplete(data), true)
techguy2000
  • 4,861
  • 6
  • 32
  • 48