I am a novice user in the library RXJS and trying to figure out how to use properly Observable and Subjects. I am trying to draw parallels with pattern design Observer. At some point, I have a question if the instance of the Observable from the library RXJS is a special case of Observer pattern design?
1 Answers
An Observable
is, by definition, an entity which emits data over time. This sounds a bit vague and, at the same time, very interesting.
In my opinion, all the RxJS' magic is achieved with linked lists.
Whenever you create an Observable
using new Observable(subscriber => {})
, you are defining the source, or the HEAD node of a linked list. Also, have you ever wondered why the parameter is called subscriber
or observer
? I'll try to share my view on that as well.
The main linked list is created with the help of Observable.pipe()
:
pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
return operations.length ? pipeFromArray(operations)(this) : this;
}
export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
if (fns.length === 0) {
return identity as UnaryFunction<any, any>;
}
if (fns.length === 1) {
return fns[0];
}
return function piped(input: T): R {
return fns.reduce((prev: any, fn: UnaryFunction<T, R>) => fn(prev), input as any);
};
}
and Observable.lift()
:
protected lift<R>(operator?: Operator<T, R>): Observable<R> {
const observable = new Observable<R>();
observable.source = this;
observable.operator = operator;
return observable;
}
As you know, in RxJS there are many operators. An operator
is a function which returns another function whose argument is an Observable
(of type T
) and whose return value is also an Observable
(of type R
).
For example, map()
:
export function map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R> {
return function mapOperation(source: Observable<T>): Observable<R> {
if (typeof project !== 'function') {
throw new TypeError('argument is not a function. Are you looking for `mapTo()`?');
}
return lift(source, new MapOperator(project, thisArg));
};
}
So, when you have
const src$ = new Observable(s => /* ... */)
.pipe(
map(/* ... */)
)
a few things will happen:
- first, it will create the
Observable
instance; the provided callback(in this cases => ...
) will be stored in the_subscribe
property pipe()
is invoked; it will returnfns[0]
, in this case themapOperation
functionmapOperation
will be invoked with theObservable
instance as its argument(frompipeFromArray(operations)(this)
); when invoked, it will callsource.lift(new MapOperator(project, thisArg));
;Observable.lift()
is what adds nodes to this linked list; as you can see, a node(apart fromHEAD
) holds thesource
and theoperator
which represents it
When you subscribe to src$
, based on this list, another one will be created. In this one, each node will be a Subscriber
. The creation of this list is based on the fact that each operator
must have a call
method
export interface Operator<T, R> {
call(subscriber: Subscriber<R>, source: any): TeardownLogic;
}
export class MapOperator<T, R> implements Operator<T, R> {
constructor(private project: (value: T, index: number) => R, private thisArg: any) {
}
call(subscriber: Subscriber<R>, source: any): any {
return source.subscribe(new MapSubscriber(subscriber, this.project, this.thisArg));
}
}
the relations between the Subscriber
nodes are established in Observable.subscribe()
In this case, the s
parameter from new Observable(s => ...)
(the above example) will be the MapSubscriber
.
It may seem that I deviated from the question, but with the above explanations I wanted to demonstrate that there is not much of the Observer
pattern here.
This pattern can be achieved with a Subject
, which extends Observable
:
export class Subject<T> extends Observable<T> implements SubscriptionLike { }
and this means that you can use Subject.pipe(...)
and Subject.subscribe(subscriber)
. What Subject
does in order to achieve this pattern is to have a custom _subscribe
method:
_subscribe(subscriber: Subscriber<T>): Subscription {
if (this.closed) {
throw new ObjectUnsubscribedError();
} else if (this.hasError) {
subscriber.error(this.thrownError);
return Subscription.EMPTY;
} else if (this.isStopped) {
subscriber.complete();
return Subscription.EMPTY;
} else {
// !!!
this.observers.push(subscriber);
return new SubjectSubscription(this, subscriber);
}
}
as you can see, the Subject
class keeps track of its observers(subscribers), so that when it emits a value, with Subject.next()
, all its observers will receive it:
next(value: T) {
if (this.closed) {
throw new ObjectUnsubscribedError();
}
if (!this.isStopped) {
const { observers } = this;
const len = observers.length;
const copy = observers.slice();
for (let i = 0; i < len; i++) {
copy[i].next(value!);
}
}
}
As a side node, a Subject
can act as a Subscriber
as well, so you don't have to manually call Subject.{next, error, complete}()
all the time. You can achieve that with something like this
src$.pipe(subjectInstance);

- 11,116
- 1
- 14
- 31