22

I have a hot observable (a subject in this case):

var subject = new Rx.Subject();

I want to create another observable that every time a new subscriptions is being made immediately fires out the last value that was produced.

So in pseudo code:

var myObservableWithLastValue = subject.publishLast();

subject.onNext(3);

myObservableWithLastValue.subscribe(function(x){
    console.log(x); //should write 3
});

myObservableWithLastValue.subscribe(function(x){
    console.log(x); //should write 3, too
});

subject.onNext(4);

myObservableWithLastValue.subscribe(function(x){
    console.log(x); //should write 4
});

This is roughly what I want and it seems to work. However, I guess there must be some built in mechanism to achieve the same

Rx.Observable.prototype.keepLatest = function () {
    var latestValue;

    var disposable = this.subscribe(function (value) {
        latestValue = value;
    });

    return Rx.Observable.create(function (observer) {
        observer.onNext(latestValue);
        return disposable.dispose;
    });
};
Gajus
  • 69,002
  • 70
  • 275
  • 438
Christoph
  • 26,519
  • 28
  • 95
  • 133

3 Answers3

21

RxJs now has the ReplaySubject. Initialize it with 1 buffer and you have the BehaviorSubject.

// as an example, use buffer size of 2
var subject = new Rx.ReplaySubject(2 /* buffer size */);

subject.onNext('a');
subject.onNext('b');
subject.onNext('c');

subject.subscribe(function (x) { document.write('x1:' + x + '<br>'); });

subject.onNext('d');

subject.subscribe(function (x) { document.write('x2:' + x + '<br>'); });
<script src='https://rawgit.com/Reactive-Extensions/RxJS/v.2.5.3/dist/rx.all.js'></script>
allprog
  • 16,540
  • 9
  • 56
  • 97
  • @gajus-kuizinas Can you tell me what is the major difference between your code and the original? I'm afraid I'm missing something. Additionally, it sets the buffer size to 2 so the last 2 values will be replayed every time. – allprog Aug 03 '15 at 08:55
  • None. I just made the example interactive and complete. The buffer size is used to demonstrate difference from using `BehaviorSubject` in the @Asti answer. Thats the reason I did not create a new asnwer, but just contributed to yours. – Gajus Aug 03 '15 at 09:58
  • @GajusKuizinas Ok. I see your point. Improved a little more to show the results in the results window instead of console :) – allprog Aug 03 '15 at 11:06
  • You really shouldn't be using `document.write` for this purpose. `console.log` allows to inspect and interact with the output. Thats opinion based suggestion, though. – Gajus Aug 03 '15 at 12:06
  • @GajusKuizinas With console.log you can't see anything in the results window. Do you have any better suggestion? console.log is ok but not too interactive – allprog Aug 03 '15 at 14:08
  • It is by design that you cannot see anything in the browser window... because you can see everything in your Console window (dev tools). – Gajus Aug 03 '15 at 15:29
7

BehaviorSubject:

Initializes a new instance of the Rx.BehaviorSubject class which creates a subject that caches its last value and starts with the specified value.

var subject = new Rx.BehaviorSubject('a' /* initial value */);

subject.subscribe(function (x) {
    console.log('x1:' + x);
});

subject.onNext('d');

// Will produce the last value.

subject.subscribe(function (x) {
    console.log('x2:' + x);
});
<script src='https://rawgit.com/Reactive-Extensions/RxJS/v.2.5.3/dist/rx.all.js'></script>
Gajus
  • 69,002
  • 70
  • 275
  • 438
Asti
  • 12,447
  • 29
  • 38
  • 2
    Ok, but is there no built in operator that takes an existing stream and applies the sketched behaviour? – Christoph Mar 16 '12 at 19:19
  • 1
    @Christoph Have you tried swapping your ordinary subject for a BehaviorSubject? – Asti Mar 16 '12 at 22:30
  • 1
    I'm not dealing with subjects directly. This was just to illustrate the behaviour better. I want an operator that applies on any hot infinite stream. – Christoph Mar 16 '12 at 22:48
  • 1
    Subjects implement both IObserver and IObservable. You should be able to get the behaviour subject to subscribe to your IObservable and then in turn Subscribe to the BehaviorSubject – Dave Hillier Jul 31 '12 at 19:23
0

I have used .cache(1) on a hot Observable. Seems to give the behaviour you asked for (but I am a novice).

observable that every time a new subscriptions is being made immediately fires out the last value that was produced.

mortenpi
  • 4,351
  • 2
  • 18
  • 20
  • 1
    FYI, my understanding is that `cache` has been [removed](https://github.com/ReactiveX/rxjs/pull/2012) from the upcoming release. – cartant Dec 03 '16 at 09:07