37

I have two observables:

  1. An observable representing a list of checkbox inputs.
  2. An observable representing a stream of events coming from the server.

I'd like to filter the second observable using values from the first one.

The values received from the server include a tag property, which corresponds to values in the checkbox list. The observable resulted from the combination of the above two would only yield values from the server whose tag property is included in the set of ticked checkboxes.

Gajus
  • 69,002
  • 70
  • 275
  • 438
Ionuț G. Stan
  • 176,118
  • 18
  • 189
  • 202

4 Answers4

44

You can use withLatestFrom. enter image description here.

source.withLatestFrom(checkboxes, (data, checkbox) => ({data, checkbox}))
  .filter(({data, checkbox}) => ...)

Here, checkboxes is an observable representing a list of checkbox inputs. source is an observable representing a stream of events coming from the server. In the filter function you can check if the data is valid compared to the checkbox settings and let it trough.

Notice it is important checkboxes emits at least 1 value before the stream can emit anything.

Ps. In regard to other answers, this solution works even if the source is cold.

Dorus
  • 7,276
  • 1
  • 30
  • 36
  • 1
    @IonuțG.Stan Indeed. And it it surprising how many people still refer to these old questions, so hopefully it'll help a few people :) – Dorus Aug 25 '16 at 09:28
  • You should note that this is is annotated with @Experimental, and could change **significantly** at any time, so it should not be used or relied upon in production code. – acrespo Apr 25 '17 at 23:41
  • @acrespo This is use in production code in many places. Where exactly did you find this @ Experimental tag? – Dorus Apr 26 '17 at 13:15
  • Oh I'm sorry, I arrived at the question looking for some RxJava insight (same question but for java), and I didn't notice the `rxjs` tag. So I added that comment because the `withLatestFrom` operator is annotated with `@Experimental` in the [java implementation](https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/Observable.java#L12068) (assuming version 1.X of RxJava). My bad! – acrespo Apr 26 '17 at 14:33
  • @acrespo As far as i can see [`withLatestFrom`](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Observable.html#withLatestFrom(java.lang.Iterable,%20io.reactivex.functions.Function)) doesn't have the experimental tag in RxJava 2. – Dorus Apr 26 '17 at 14:36
  • Yeah, I know, [I don't see it in the source code neither](https://github.com/ReactiveX/RxJava/blob/2.x/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFrom.java). Just wanted to note that for RxJava 1, which is still actively maintained and widely used. – acrespo Apr 26 '17 at 20:34
  • Tried to upvote this but looks like I'd downvoted it 9 months ago. Probably a mistake - sorry! – Simon_Weaver Jan 18 '18 at 07:37
  • @Simon_Weaver AhA! I was wondering about that downvote for 9 months now :-) – Dorus Jan 18 '18 at 12:00
6

In order to filter stream A using values of stream B, you need to observe stream B and use the latest values to filter stream A.

Use switch() to transform B observable to an observable producing values from A observable.

checkedInputValuesSource
    .map(function (options) {
        return dataSource
            .filter(function (value) {
                return options.indexOf(value) !== -1;
            });
    })
    .switch()
    .subscribe(function (x) {
        console.log('out: ' + x);
    });

Using switch() assumes that dataSource is a hot observable.

Example using interval() to produce dummy data:

var input,
    checkedInputValuesSource,
    dataSource;

input = document.querySelectorAll('input');

// Generate source describing the current filter.
checkedInputValuesSource = Rx.Observable
    .fromEvent(input, 'change')
    .map(function () {
        var inputs = document.querySelectorAll('input'),
            checkedInputValues = [];
        
        [].forEach.call(inputs, function (e) {
            if (e.checked) {
                checkedInputValues.push(e.value);
            }
        });
        
        return checkedInputValues;
    })
    .startWith([]);

// Generate random data source (hot).
dataSource = Rx.Observable
    .interval(500)
    .map(function () {
        var options = ['a', 'b', 'c'];
    
        return options[Math.floor(Math.floor(Math.random() * options.length))];
    })
    .do(function (x) {
        console.log('in: ' + x);
    })
    .share();

checkedInputValuesSource
    .map(function (options) {
        return dataSource
            .filter(function (value) {
                return options.indexOf(value) !== -1;
            });
    })
    .switch()
    .subscribe(function (x) {
        console.log('out: ' + x);
    });
<script src='https://rawgit.com/Reactive-Extensions/RxJS/v.2.5.3/dist/rx.all.js'></script>

<input type='checkbox' value='a'>
<input type='checkbox' value='b'>
<input type='checkbox' value='c'>

This example will produce output similar to:

in: c
in: a
out: a
in: b
in: c
out: a
in: b
in: a

Where in reflects all generated input and b the data that passes the filter. Filter is adjusted by checking the checkbox inputs, that reflect values "a", "b" and "c".

Gajus
  • 69,002
  • 70
  • 275
  • 438
  • Isn't this exactly what I wrote in my solution? – Ionuț G. Stan Aug 03 '15 at 18:04
  • 2
    Might be. I am going through each rxjs question/answer and contribute whenever I can. Your answer links to an external resource and requires additional dependencies to run. I have written down a solution in a sandboxed environment. – Gajus Aug 03 '15 at 19:25
  • 2
    Notice you can make this work even if the source is cold. With [`publish(...)`](https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/publish.md). Like this: `dataSource .publish(dsrc => checkedInputValuesSource .switchMap((options) => dsrc .filter((value) => options.indexOf(value) !== -1) ) )` – Dorus Aug 24 '16 at 22:09
2

Apparently, what I needed was a combination of select, filter and switchLatest. I've written a small test case demonstrating this: https://gist.github.com/igstan/d5b8db7b43f49dd87382#file-observable-filter-observable-js-L36-L45

Ionuț G. Stan
  • 176,118
  • 18
  • 189
  • 202
0

Expanding on the answer from @Dorus... In Kotlin, you can do it like so:

val observable: Observable<Data> = ...
val filter: Observable<Checkbox> = ...
val filtered: Observable<Data> =
        observable.filterWithLatestFrom(filter) { checkbox -> checkbox.isSelected }

Using the extension function:

/**
 * Return an [Observable] with type [T1] that is filtered using the last event emitted by the [other] observable.
 */
fun <T1 : Any, T2 : Any> Observable<T1>.filterWithLatestFrom(other: Observable<T2>, filterFunction: (T2) -> Boolean)
: Observable<T1> {
    return this.withLatestFrom(other) { obs1, obs2 -> Pair(obs1, obs2) }
        .filter { (_, obs2) -> filterFunction.invoke(obs2) }
        .map { (obs1, _) -> obs1}
}
Jeff McKnight
  • 71
  • 1
  • 3