5

I am a novice user in the library RXJS and trying to figure out how to use properly Observable and Subjects. I am trying to draw parallels with pattern design Observer. At some point, I have a question if the instance of the Observable from the library RXJS is a special case of Observer pattern design?

Michael
  • 13,950
  • 57
  • 145
  • 288

1 Answers1

5

An Observable is, by definition, an entity which emits data over time. This sounds a bit vague and, at the same time, very interesting.

In my opinion, all the RxJS' magic is achieved with linked lists.

Whenever you create an Observable using new Observable(subscriber => {}), you are defining the source, or the HEAD node of a linked list. Also, have you ever wondered why the parameter is called subscriber or observer? I'll try to share my view on that as well.

The main linked list is created with the help of Observable.pipe():

pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
  return operations.length ? pipeFromArray(operations)(this) : this;
}

export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
  if (fns.length === 0) {
    return identity as UnaryFunction<any, any>;
  }

  if (fns.length === 1) {
    return fns[0];
  }

  return function piped(input: T): R {
    return fns.reduce((prev: any, fn: UnaryFunction<T, R>) => fn(prev), input as any);
  };
}

and Observable.lift():

protected lift<R>(operator?: Operator<T, R>): Observable<R> {
  const observable = new Observable<R>();
  observable.source = this;
  observable.operator = operator;
  return observable;
}

As you know, in RxJS there are many operators. An operator is a function which returns another function whose argument is an Observable(of type T) and whose return value is also an Observable(of type R).

For example, map():

export function map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R> {
  return function mapOperation(source: Observable<T>): Observable<R> {
    if (typeof project !== 'function') {
      throw new TypeError('argument is not a function. Are you looking for `mapTo()`?');
    }
    return lift(source, new MapOperator(project, thisArg));
  };
}

So, when you have

const src$ = new Observable(s => /* ... */)
  .pipe(
    map(/* ... */)
  )

a few things will happen:

  • first, it will create the Observable instance; the provided callback(in this case s => ...) will be stored in the _subscribe property
  • pipe() is invoked; it will return fns[0], in this case the mapOperation function
  • mapOperation will be invoked with the Observable instance as its argument(from pipeFromArray(operations)(this)); when invoked, it will call source.lift(new MapOperator(project, thisArg));; Observable.lift() is what adds nodes to this linked list; as you can see, a node(apart from HEAD) holds the source and the operator which represents it

When you subscribe to src$, based on this list, another one will be created. In this one, each node will be a Subscriber. The creation of this list is based on the fact that each operator must have a call method

export interface Operator<T, R> {
  call(subscriber: Subscriber<R>, source: any): TeardownLogic;
}

MapOperator is no exception:

export class MapOperator<T, R> implements Operator<T, R> {
  constructor(private project: (value: T, index: number) => R, private thisArg: any) {
  }

  call(subscriber: Subscriber<R>, source: any): any {
    return source.subscribe(new MapSubscriber(subscriber, this.project, this.thisArg));
  }
}

the relations between the Subscriber nodes are established in Observable.subscribe()

In this case, the s parameter from new Observable(s => ...)(the above example) will be the MapSubscriber.

It may seem that I deviated from the question, but with the above explanations I wanted to demonstrate that there is not much of the Observer pattern here.

This pattern can be achieved with a Subject, which extends Observable:

export class Subject<T> extends Observable<T> implements SubscriptionLike { }

and this means that you can use Subject.pipe(...) and Subject.subscribe(subscriber). What Subject does in order to achieve this pattern is to have a custom _subscribe method:

_subscribe(subscriber: Subscriber<T>): Subscription {
  if (this.closed) {
    throw new ObjectUnsubscribedError();
  } else if (this.hasError) {
    subscriber.error(this.thrownError);
    return Subscription.EMPTY;
  } else if (this.isStopped) {
    subscriber.complete();
    return Subscription.EMPTY;
  } else {
    
    // !!!
    this.observers.push(subscriber);
    return new SubjectSubscription(this, subscriber);
  }
}

as you can see, the Subject class keeps track of its observers(subscribers), so that when it emits a value, with Subject.next(), all its observers will receive it:

next(value: T) {
  if (this.closed) {
    throw new ObjectUnsubscribedError();
  }
  if (!this.isStopped) {
    const { observers } = this;
    const len = observers.length;
    const copy = observers.slice();
    for (let i = 0; i < len; i++) {
      copy[i].next(value!);
    }
  }
  }

As a side node, a Subject can act as a Subscriber as well, so you don't have to manually call Subject.{next, error, complete}() all the time. You can achieve that with something like this

src$.pipe(subjectInstance);
Andrei Gătej
  • 11,116
  • 1
  • 14
  • 31