3

How do I define operators on an RX subject? I want to be able to throttle/take/debounce a data stream without changing the original source stream. ive tried the following (broken) implementation. any help appreciated.

var source = Rx.Observable.interval(1000).take(10);

var subject = new Rx.Subject();
source.subscribe(subject);

var subscriber1 = subject.subscribe(
    function (x) { $("#result1").append('next: ' + x + '<br>'); },
    function (e) { $("#result1").append('onError: ' + e.message); },
    function () { $("#result1").append('onCompleted'); });

var modified = new Rx.Subject().take(2); // ... or throttle, or debounce etc

source.subscribe(modified);

var subscriber2 = modified.subscribe(
    function (x) { $("#result2").append('next: ' + x + '<br>'); },
    function (e) { $("#result2").append('onError: ' + e.message); },
    function () { $("#result2").append('onCompleted'); });
giannis christofakis
  • 8,201
  • 4
  • 54
  • 65
Laurence Fass
  • 1,614
  • 3
  • 21
  • 43

1 Answers1

2

You can review What are the semantics of different RxJS subjects? for some extra info on subjects.

Subjects are observables, so you can use the same operators which act on observables. Subjects are observers so you can subscribe them to sources.

What is wrong in your code is the following var modified = new Rx.Subject().take(2); source.subscribe(modified);

modified is not a subject anymore, it is only a regular observable, so you cannot subscribe it to source, you can only subscribe an observer to a source.

So do something like :

var newS = new Rx.Subject();
source.subscribe(newS);

var subscriber2 = newS.take(2).subscribe(
    function (x) { $("#result2").append('next: ' + x + '<br>'); },
    function (e) { $("#result2").append('onError: ' + e.message); },
    function () { $("#result2").append('onCompleted'); });
Community
  • 1
  • 1
user3743222
  • 18,345
  • 5
  • 69
  • 75