38

I have a RxJS5 pipeline looks like this

Rx.Observable.from([2, 3, 4, 5, 6])
  .takeWhile((v) => { v !== 4 })

I want to keep the subscription until I see 4, but I want to last element 4 also to be included in the result. So the example above should be

2, 3, 4

However, according to official document, takeWhile operator is not inclusive. Which means when it encounters the element which doesn't match predicate we gave, it completes the stream immediately without the last element. As a result, the above code will actually output

2, 3

So my question is, what's the easiest way I can achieve takeWhile but also emit the last element with RxJS?

Fang-Pen Lin
  • 13,420
  • 15
  • 66
  • 96
  • 1
    Somewhat facetious, but the easiest way would be `.takeWhile(v => v < 5)` – cartant Jun 19 '17 at 23:23
  • More seriously: https://github.com/martinsik/rxjs-extra/blob/master/src/operator/takeWhileInclusive.ts – cartant Jun 19 '17 at 23:26
  • worked for me https://github.com/MatthiasKunnen/rxjs-take-while-inclusive – srghma Dec 25 '18 at 16:54
  • Possibly not helpful, but relevant (it was what I ended up using after finding this post): RxJava [has a `takeUntil` method which takes a predicate](http://reactivex.io/RxJava/javadoc/rx/Observable.html#takeUntil-rx.functions.Func1-). The difference with `takeWhile` is that it _does_ include the last item. – Steven Jeuris Jan 04 '19 at 10:15

6 Answers6

56

Since RxJS 6.4.0 this is now possible with takeWhile(predicate, true).

There's already an opened PR that adds an optional inclusive parameter to takeWhile: https://github.com/ReactiveX/rxjs/pull/4115

There're at least two possible workarounds:

  1. using concatMap():

    of('red', 'blue', 'green', 'orange').pipe(
      concatMap(color => {
        if (color === 'green') {
          return of(color, null);
        }
        return of(color);
      }),
      takeWhile(color => color),
    )
    
  2. Using multicast():

    of('red', 'blue', 'green', 'orange').pipe(
      multicast(
        () => new ReplaySubject(1),
        subject => subject.pipe(
          takeWhile((c) => c !== 'green'),
          concat(subject.take(1),
        )
      ),
    )
    

I've been using this operator as well so I made it to my own set of additional RxJS 5 operators: https://github.com/martinsik/rxjs-extra#takewhileinclusive

This operator has been also discussed in this RxJS 5 issue: https://github.com/ReactiveX/rxjs/issues/2420

Jan 2019: Updated for RxJS 6

martin
  • 93,354
  • 25
  • 191
  • 226
19

UPDATE March 2019, rsjx version 6.4.0: takeWhile finally have an optional inclusive parameter that allows to keep the first element that breaks the condition. So now the solution would be simply to pass true as the second argument of takeWhile:

import { takeWhile } from 'rxjs/operators';
import { from } from 'rxjs';

const cutOff = 4.5
const example = from([2, 3, 4, 5, 6])
.pipe(takeWhile(v => v < cutOff, true ))
const subscribe = example.subscribe(val =>
  console.log('inclusive:', val)
);

outputs:

inclusive: 2
inclusive: 3
inclusive: 4
inclusive: 5

Live here:

https://stackblitz.com/edit/typescript-m7zjkr?embed=1&file=index.ts

Notice that 5 is the first element that breaks the condition. Notice that endWith is not really a solution when you have dynamical conditions like v < cutOff and you don't know what will be your last element.

Thanks @martin for pointing out the existence of this pull request.

Batato
  • 560
  • 5
  • 18
4

You can use endWith(value) which (unlike a lot of RxJS code) is very nicely self documenting.

const example = source.pipe(
                            takeWhile(val => val != 4), 
                            endWith(4));

PS. Also note that takeUntil doesn't take a predicate, so if you were trying to use that operator to solve this problem you can't. It's a whole different method signature.

Official docs: https://rxjs-dev.firebaseapp.com/api/operators/endWith

https://stackblitz.com/edit/typescript-pvuawt

Simon_Weaver
  • 140,023
  • 84
  • 646
  • 689
  • **Watch out - not all operators are on learn-rxjs.io, so it's not currently the best place to learn** – Simon_Weaver Aug 22 '18 at 19:10
  • Directly answering the asked question, where the final value is known, I like this as the most simple and straightforward. Of course as others have noted, the addition of an inclusive parameter to the operator in v6.4 is ideal. – reads0520 Apr 09 '19 at 13:32
3

If your comparison is such that you know exactly what is the last element (like for !==), you can re-add it yourself:

Rx.Observable.from([2, 3, 4, 5, 6])
  .takeWhile((v) => v !== 4)
  .concat(Rx.Observable.of(4))
  .subscribe(console.log)
2

I came across the same problem, i needed the last element to be included so i chose to keep a reference to the subscription and unsubscribe within the onNext callback when the condition was met. Using your example code it would be:

const subscription = Observable.of('red', 'blue', 'green', 'orange')
  .subscribe(color => {
    // Do something with the value here
    if (color === 'green') {
      subscription.unsubscribe()
    }
  }) 

This worked for me because it also caused the observable source to stop emitting which is what i needed in my scenario. I realize that i'm not using takeWhile operator but the the main goal is achieved and without any workarounds or extra code. I'm not a fan of forcing things to work in a way that they were not designed to. The disadvantages of this are:

  • If there are any other observers subscribed, the source will keep emitting.
  • The onCompleted does not get called for some reason if the last observer unsubscribes, but i checked that the source in fact stops emitting.
Mauri Q
  • 352
  • 2
  • 9
1

In my case, I was unable to predict what the final value would be. I also just wanted a solution involving common, easy operators, and I wanted something I could reuse, so I couldn't rely on the values being truthy. The only thing I could think of was defining my own operator like this:

import { pipe, from } from 'rxjs';
import { switchMap, takeWhile, filter, map } from 'rxjs/operators';

export function doWhile<T>(shouldContinue: (a: T) => boolean) {
  return pipe(
    switchMap((data: T) => from([
      { data, continue: true },
      { data, continue: shouldContinue(data), exclude: true }
    ])),
    takeWhile(message => message.continue),
    filter(message => !message.exclude),
    map(message => message.data)
  );
}

It's a little weird, but it works for me and I can import it and use it.

Michael Pearson
  • 584
  • 1
  • 4
  • 10