2

Header

If you are essentially having the same question and your context is Angular you may want to read all the comments in the answer for more context.

Short version of this question

When doing let observe$ = someReplaySubject.asObservable(), are we no longer required to unsubscribe from observe$? In other words can we call let observe$ = someReplaySubject.asObservable() from multiple Angular component instances and not worry about the connection from the notify instance to the corresponding Observable?

Assumptions

If we have a ReplaySubject<Todo[]> instance (Lets call it rxTodo and we subscribe to it from Angular components by doing:

let todoObs$:Observable<Todo[]> = rxTodo$.asObservable() then even if the component is destroyed the todoObs$ references created in each component will dangle until the component itself is destroyed.

Background

I'm attempting a Store API for Angular and I have a replay subject that broadcasts changes to slices of the store. Here is the method that allows subscriptions to happen (The notifyCount tracks subscriptions, so that if there are none we don't bother with notification):

  /**
   * Subscribe to receive slice updates.
   * @example
     <pre>
    let todos$ = slice.subscribe();
    </pre>
  */
  public subscribe(): Observable<E[]> {
    this.notifyCount++;
    return this.notify.asObservable();
  }

Above I'm attempting to follow the recommended best practice of returning an Observable instead of from the ReplaySubject.

This is the corresponding unsubscribe method:

  /**
   * Unsubscribe from slice updates.
   * 
   * @example 
    <pre>
      slice.unsubscribe(o);
    </pre>
  */
  public unsubscribe(o: ReplaySubject<E[]>) {
    o.unsubscribe();
    this.notifyCount--;
  }

I had to make the o argument a ReplaySubject type in order to unsubscribe. However that conflicts with the Observable type that the subscribe method returns.

When attempting to test like this:

incompleteSlice.unsubscribe(incomplete$);

The message returned is this:

[ts] Argument of type 'Observable' is not assignable to parameter of type 'ReplaySubject'. Property 'scheduler' is missing in type 'Observable'. let incomplete$: Observable

Any thoughts on how to fix this?

Update

One obvious thought that just came to mind is that perhaps returning asObservable means that we no longer need to actually unsubscribe from that observable. We can just leave it dangling in the event that the Angular component is destroyed?

Ole
  • 41,793
  • 59
  • 191
  • 359

1 Answers1

3

I think you are confusing unsubscribing from Subscription and from Subject.

What is actually recommended in Angular is unsubscribing from any open Subscription when the component gets destroyed. A Subscription is returned from Observable.subscribe. When it gets unsubscribed, it doesn't receive values from the source observable anymore, which is what you want in most cases.

Unsubscribing from Subject has a different effect. It switches the Subject into a closed state where you can no longer call next or subscribe to it. You can look directly at the source code to understand whats going on.

Unsubscribing from an Observable is not possible, that's in part why you get the error.

In order to follow the recommendation, you should either make a list of all Subscriptions, that is, adding each Subscription to a list that you can access later, like that:

this.subscriptions$.push(obs$.subscribe...))

And then on component destroy, call:

this.subscriptions$.forEach(sub -> sub.unsubscribe());

But even better and recommended by Core Developer Ben Lesh in this article is not imperatively calling unsubscribe, but instead utilizing the takeUntil operator.

There is a reference implementation on StackOverflow for using this pattern with Angular, that you can use as a starting point.

ggradnig
  • 13,119
  • 2
  • 37
  • 61
  • Thanks! Great articles. Do you know by chance that if we do use Observable instances that are generated from `ReplaySubject.asObservable()` within our Angular components whether this will create memory leaks? Or is it just plain wrong to use `ReplaySubject.asObservable()` .... – Ole Sep 03 '18 at 18:51
  • I updated the summary question a bit to reflect the comment ... could be I'm still confused :) – Ole Sep 03 '18 at 19:04
  • 1
    As long as the garbage collector picks up the unused observable reference, there should be no memory leak. You can call `asObservable` as often as you like. It just prevents hacking the Subject with stuff like `iAmNotASubject$["next"]()` – ggradnig Sep 03 '18 at 19:08
  • 1
    If your component is disposed, the garbage collector will probably kill all observables that are referenced by the component instance, so no memory worries there – ggradnig Sep 03 '18 at 19:09
  • Very cool. I'm a little curious about how the link to the producer is cancelled though? For example suppose the Angular component gets a reference to the `Store` holding a bunch of `Todo` instances and it calls `ReplaySubject.asObservable()` on the store and later the component is destroyed along with the `Observable` instance. Won't the link to the `ReplaySubject` keep the `Observable` from being garbage collected? – Ole Sep 03 '18 at 19:18
  • The `ReplaySubject` is the only one who knows about the `Observable`s that were created by using its `asObservable` function, apart from the subscribers of course. But as long as all **Subscriptions** to these observables are unsubscribed as well, a disposal of `ReplaySubject` also leads to the disposal of all its `Observable`s – ggradnig Sep 03 '18 at 19:24
  • Ah - Got it. In this case I was planning on creating a single `ReplaySubject` attached to the store that returns an `asObservable()` any time an Angular component calls `subscribe()` on the store, which then delegates to the `ReplaySubject`. So I think we have to assume that all the `Observable` instance returned from `asObservable()` will dangle until the `ReplaySubject` itself is destroyed (Which in this case would not happen until the application shuts down ...)? – Ole Sep 03 '18 at 19:28
  • I think perhaps the right think to do is to create one `ReplaySubject` per Angular instance / per client that wants to subscribe to receive update notifications, and then iterate through all the `ReplaySubject` instances to broadcast `Store` data events, but then that sort of defeats the entire purpose of multicasting ... – Ole Sep 03 '18 at 19:33
  • Hmmm .... After reading your answer again I think what perhaps is necessary is to unsubscribe the `subscription` reference that is implicit in the `asObservable` return value and hopefully that breaks the link? – Ole Sep 03 '18 at 19:44
  • Ah, actually I think I was wrong concerning Subject knowing about the Observables created with `asObservable`. There is no such hint in the source code that a reference is kept. – ggradnig Sep 03 '18 at 19:44
  • You should unsubscribe all subscriptions that are created with subject$.asObservable().subscribe(...) – ggradnig Sep 03 '18 at 19:45
  • I think there has to be one though, otherwise how would the `Subject` be able to mutlicast to all the Observables? – Ole Sep 03 '18 at 19:45
  • The observables know their subject, not the other way round. There is always a subject behing an observable – ggradnig Sep 03 '18 at 19:45
  • That makes sense, but what if the component gets the observable and it's implicitly subscribed to in the components template? In other words we never actually call obs.subscribe(), but instead use `*ngFor todo of todos$ | async` – Ole Sep 03 '18 at 19:48
  • 1
    Then Angular handles unsubscribing for you. This is actually preferrable to unsubscribing manually. – ggradnig Sep 03 '18 at 19:49
  • Hmmm ... but if the `ReplaySubject` instance is not aware of the `Observable` that it's sending a message to, then how does it send the message to it? – Ole Sep 03 '18 at 19:49
  • Booooffff - OK - Mind blown :)! That makes sense! .... the Angular handling the unsubscribe part ... I think it's making sense now ... Thank you so much for hanging in there with me on this. – Ole Sep 03 '18 at 19:51
  • If you call `subscribe()` on an observable created by `asObservable` you'll add the observer (your `next` function) to the subject's observer list. – ggradnig Sep 03 '18 at 19:52
  • 1
    Okay, glad I could help :) – ggradnig Sep 03 '18 at 19:52
  • OK - I assume then that when Angular calls `unsubscribe()` on our behalf, the observer is then removed from the `subjects` list? – Ole Sep 03 '18 at 19:53
  • This fixed a `Error: ViewDestroyedError: Attempt to use a destroyed view: detectChanges at viewDestroyedError` for me; the subscription was still being referenced by angular, even though the relevant component had been destroyed. Then change detection puked when trying to update the (now removed) component. – tengen Aug 19 '19 at 15:19