5

I'm implementing an angular service that lets consumers observe various values based on their id:

The essence of it looks like this:

private subjects = new Map<number, Subject<any>>();

public subscribe(id: number, observer: any): Subscription {
  // try getting subject for this id (or undefined if it does not yet exist)
  let subj = this.subjects.get(id);

  // create subject if it does not yet exist
  if (!subj) {
    subj = new Subject<any>();
    this.subjects.set(id, subj);
  }

  // subscribe observer
  const subscription = subj.subscribe(observer);

  // set up teardown logic (gets called when subscription is unsubscribed)
  subscription.add(() => { 
    // remove subject from the map, if this was the last subscription
    if (subj.observers.length == 0) {
      this.subjects.delete(id);
    }
  });

  // return subscription
  return subscription;
}

Here is the full stackblitz example

The above works fine but the API is a bit cumbersome to use (in the consumers I need to manually keep track of all the subscriptions and make sure to unsubscribe them properly).

I would prefer to have a method that returns an Observable like this:

public subscribe(id: number): Observable<any> {
  // TODO: Return an observable for this id and make sure that
  // its corresponding subject is in the map iff at least one of the observables
  // for this id has at least one subscription.
  
  return ...;
}

Because this would allow me to subscribe to the values I need directly from the component templates using the async pipe, where angular would take care of unsubscribing the observers.

But I can't quite figure out how I can implement the logic to remove unused Subjects from the Map when they are no longer used. Is there a good way to do that?

Here is an incomplete stackblitz examples with some test cases

Robert Hegner
  • 9,014
  • 7
  • 62
  • 98

1 Answers1

3

I think you could try something like this:

function subscribe(id: number): Observable<any> {

  /* ... */

  return sbj
    .pipe(
      finalize(() => {
        if (subj.observers.length == 0) {
          this.subjects.delete(id);
        }
      })
    );
}

With this, you can also use the async pipe with the AnonymousSubject returned by Subject.lift(which is called as a result of Subject.pipe()). AnonymousSubject makes sure that the observers(e.g from the template) will be added to the ``AnonymousSubject's parent Subject`'s list.

finalize() is called when the source(e.g the Subject) is unsubscribed. This can either happen when the component is destroyed, or when a complete/error event occurs, which also includes the case when the Subject completes. When a Subject completes, it will send a complete notification to all of its subscribers, meaning that the observers will eventually be automatically removed from the Subject's observer list.

EDIT

app.component.ts

  show1 = true;
  show12 = true;
  show2 = true;

  v1$: Observable<any>;
  v12$: Observable<any>;
  v2$: Observable<any>;

  constructor(public valueService: ValueService) {
  }

  async ngOnInit() {
    await this.sleep(2000);
    // const s11 = this.valueService.subscribe(1, v => this.v1 = v);
    this.v1$ = this.valueService.subscribe(1);
    await this.sleep(2000);
    // const s21 = this.valueService.subscribe(2, v => this.v2 = v);
    this.v2$ = this.valueService.subscribe(2);
    await this.sleep(2000);
    // const s12 = this.valueService.subscribe(1, () => {});
    this.v12$ = this.valueService.subscribe(1);
    await this.sleep(2000);
    // s12.unsubscribe();
    this.show12 = false
    await this.sleep(2000);
    // s11.unsubscribe();
    this.show1 = false;
    await this.sleep(2000);
    // s21.unsubscribe();
    this.show2 = false
  }

app.component.html

<div *ngIf="show1">
  v1: {{ v1$ | async }}
</div>

<div *ngIf="show12">
  v12: {{ v12$ | async }}
</div>

<div *ngIf="show2">
  v2: {{ v2$ | async }}
</div>

value.service.ts

public subscribe(id: number): Observable<any> {
  let subj = this.subjects.get(id);

  if (!subj) {
    subj = new Subject<any>();
    this.subjects.set(id, subj);
  }

  return subj.pipe(
    finalize(() => {
      if (subj.observers.length === 1) {
        this.subjects.delete(id);
      }
    })
  )
}

StackBlitz.

As @ggradnig mentioned, the check should be subj.observers.length === 1, since finalize(),at least in RxJs 6.5.x, runs its callback before any other unsubscriptions take place.

Andrei Gătej
  • 11,116
  • 1
  • 14
  • 31
  • This looks interesting but does not seem to work as expected. 1) It adds the subject to the map even if no subscriptions are made on the observable. 2) Even if I do subscribe/unsubscribe the subject does not get removed. 3) I get a weird `this.handler.handle is not a function at MergeMapSubscriber.project` error at runtime (which maybe is the root cause for #2). – Robert Hegner Jun 26 '20 at 10:42
  • Could you create a stackblitz example? – Andrei Gătej Jun 26 '20 at 10:48
  • 1
    Good idea - will do that later today – Robert Hegner Jun 26 '20 at 10:55
  • I updated my question with two stackblitz examples. – Robert Hegner Jun 29 '20 at 09:13
  • @AndreiGătej This won't work - A subject will 1) not complete when it's unsubscribed and 2) won't be unsubscribed just because all subscribers have unsubscribed from it. – ggradnig Jun 29 '20 at 09:34
  • Robert Hegner thanks, I will have a look! @ggradnig I'm not sure where in my answer I was suggesting either `1)` or `2)`. I think `When a Subject..` should've been another paragraph. – Andrei Gătej Jun 29 '20 at 10:36
  • OP wants to run code when all subscribers from the Subjects are unsubscribed. You suggested `finalize` which will only trigger on `complete` - which won't happen when all subscribers have unsubscribed. – ggradnig Jun 29 '20 at 10:55
  • "finalize() is called when the source(e.g the Subject) is unsubscribed" - Finalize is only triggered on `complete`, not on `unsubscribe`. – ggradnig Jun 29 '20 at 10:56
  • "This can either happen when the component is destroyed" - Subjects are not unsubscribed just because their subscribers unsubscribe. – ggradnig Jun 29 '20 at 11:02
  • `Finalize is only triggered on complete` - I'd say it is called on `complete/error` and when the subscriber unsubscribes. [Demo](https://stackblitz.com/edit/rxjs-when-finalize-is-called?file=index.ts). – Andrei Gătej Jun 29 '20 at 11:09
  • @ggradnig `Subjects are not unsubscribed just because their subscribers unsubscribe` - I might not have chosen my words wisely. I didn't mean `subject.unsubscribe()`, but `s = subject.subscribe(); s.unsubscribe()` - isn't the subject unsubscribed here? – Andrei Gătej Jun 29 '20 at 11:12
  • @1: Only in case of Observable as a source, but not with Subject: https://stackblitz.com/edit/rxjs-when-finalize-is-called-nts5rt?file=index.ts – ggradnig Jun 29 '20 at 11:21
  • @ggradnig yeah, but in my answer it's different: https://stackblitz.com/edit/rxjs-when-finalize-is-called-pdwykd?file=index.ts – Andrei Gătej Jun 29 '20 at 11:25
  • @2: With subject the wording is a bit tricky: "Unsubscribing a subject" means disconnecting the subject from ITS source. This has other effects than "Unsubscribing from a Subject". – ggradnig Jun 29 '20 at 11:27
  • It doesn't print `finalize` in your latest StackBlitz for me – ggradnig Jun 29 '20 at 11:28
  • @ggradnig that's weird.. it should: https://stackblitz.com/edit/rxjs-when-finalize-is-called-pdwykd?file=index.ts – Andrei Gătej Jun 29 '20 at 11:30
  • Yeah now it works for me... anyway, this will cause any subscriber to trigger `finalize`, not only the last one: https://stackblitz.com/edit/rxjs-when-finalize-is-called-9fnvcu?file=index.ts – ggradnig Jun 29 '20 at 11:35
  • 1
    Ah, just seeing the observer array you're checking. Let me see if this works... – ggradnig Jun 29 '20 at 11:38
  • Hmm okay I guess you're right - with this approach it works: https://stackblitz.com/edit/rxjs-when-finalize-is-called-9fnvcu?file=index.ts Note though that `observers.length` is still 1 after the last unsubscribe - maybe that's the reason why it didn't work for OP. – ggradnig Jun 29 '20 at 11:40
  • @ggradnig thanks! It was your contribution as well! – Andrei Gătej Jun 29 '20 at 12:20