2

I am using rxjs together with Angular 2 and Typescript. I would like to share a common web-resource (a "project" in the context of my app, essentially a JSON document) between multiple components. To achieve this I introduced a service that exposes an observable, which will be shared by all clients:

/**
 * Handed out to clients so they can subscribe to something.
 */
private _observable : Observable<Project>;

/**
 * Used to emit events to clients.
 */
private _observer : Observer<Project>;

constructor(private _http: Http) {
    // Create observable and observer once and for all. These instances
    // are not allowed to changed as they are passed on to every subscriber.
    this._observable = Observable.create( (obs : Observer<Project>) => {
        this._observer = obs;
    });
}

Clients now simply get a reference to that one _observable and subscribe to it.

/**
 * Retrieves an observable that always points to the active
 * project.
 */
get ActiveProject() : Observable<Project> {
    return (this._observable);
}

When some component decides to actually load a project, it calls the following method:

/**
 * @param id The id of the project to set for all subscribers
 */
setActiveProject(id : string) {
    // Projects shouldn't change while other requests are in progress
    if (this._httpRequest) {
        throw { "err" : "HTTP request in progress" };
    }

    this._httpRequest = this._http.get('/api/project/' + id)
        .catch(this.handleError)
        .map(res => new Project(res.json()));

    this._httpRequest.subscribe(res => {
        // Cache the project
        this._cachedProject = res;
        // Show that there are no more requests
        this._httpRequest = null;
        // Inform subscribers
        this._observer.next(this._cachedProject)

        console.log("Got project");
    });
}

It basically does a HTTP request, transforms the JSON document into a "proper" instance and calls this._observer.next() to inform all subscribers about the change.

But if something subscribes after the HTTP request has already taken place, the see nothing until a new HTTP request is issued. I have found out that there is some kind of caching (or replay?) mechanism in rxjs that seems to adress this, but I couldn't figure out how to use it.

tl;dr: How do I ensure that a call to subscribe on the observer initially receives the most recent value?

Extra question: By "pulling the observer out of the observable" (in the constructor), have I essentially created a subject?

user3743222
  • 18,345
  • 5
  • 69
  • 75
Marcus Riemer
  • 7,244
  • 8
  • 51
  • 76

2 Answers2

3

That's what BehaviorSubject does

import { BehaviorSubject } from 'rxjs/subject/BehaviorSubject';
...
obs=new BehaviourSubject(4);
obs.subscribe(); //prints 4
obs.next(3); //prints 3
obs.subscribe(); //prints 3
Günter Zöchbauer
  • 623,577
  • 216
  • 2,003
  • 1,567
  • Docs are here: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/subjects/behaviorsubject.md and it says: "Observers can subscribe to the subject to receive the last (or initial) value and all subsequent notifications." – Anders Zommarin May 11 '17 at 06:08
  • Reactive-Extensions id rxjs4, but Angular uses rxjs5 which is https://github.com/ReactiveX/rxjs/blob/master/doc/subject.md – Günter Zöchbauer May 11 '17 at 06:13
1

I usually achieve this with shareReplay(1). Using this operator with 1 as parameter will ensure that the latest value emitted will be kept in a buffer, so when there is a new subscriber that value is immediately passed on to it. You can have a look at the documentation :

var interval = Rx.Observable.interval(1000);

var source = interval
    .take(4)
    .doAction(function (x) {
        console.log('Side effect');
    });

var published = source
    .shareReplay(3);

published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));

// Creating a third subscription after the previous two subscriptions have
// completed. Notice that no side effects result from this subscription,
// because the notifications are cached and replayed.
Rx.Observable
    .return(true)
    .delay(6000)
    .flatMap(published)
    .subscribe(createObserver('SourceC'));

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) {
            console.log('Next: ' + tag + x);
        },
        function (err) {
            console.log('Error: ' + err);
        },
        function () {
            console.log('Completed');
        });
}

// => Side effect
// => Next: SourceA0
// => Next: SourceB0
// => Side effect
// => Next: SourceA1
// => Next: SourceB1
// => Side effect
// => Next: SourceA2
// => Next: SourceB2
// => Side effect
// => Next: SourceA3
// => Next: SourceB3
// => Completed
// => Completed
// => Next: SourceC1
// => Next: SourceC2
// => Next: SourceC3
// => Completed

Extra question: By "pulling the observer out of the observable" (in the constructor), have I essentially created a subject?

I am not sure what you mean by that, but no. A subject is both an observer and an observable and have specific semantics. It is not enough to 'pull the observer out of the observable' as you say. For subjects semantics, have a look here : What are the semantics of different RxJS subjects?

Community
  • 1
  • 1
user3743222
  • 18,345
  • 5
  • 69
  • 75