80

tl;dr: Basically I want to marry Angular's ngOnDestroy with the Rxjs takeUntil() operator. -- is that possible?

I have an Angular component that opens several Rxjs subscriptions. These need to be closed when the component is destroyed.

A simple solution for this would be:

class myComponent {

  private subscriptionA;
  private subscriptionB;
  private subscriptionC;

  constructor(
    private serviceA: ServiceA,
    private serviceB: ServiceB,
    private serviceC: ServiceC) {}

  ngOnInit() {
    this.subscriptionA = this.serviceA.subscribe(...);
    this.subscriptionB = this.serviceB.subscribe(...);
    this.subscriptionC = this.serviceC.subscribe(...);
  }

  ngOnDestroy() {
    this.subscriptionA.unsubscribe();
    this.subscriptionB.unsubscribe();
    this.subscriptionC.unsubscribe();
  }

}

This works, but it's a bit redundant. I especially don't like that - The unsubscribe() is somewhere else, so you gotta remember that these are linked. - The component state is polluted with the subscription.

I would much prefer using the takeUntil() operator or something similar, to make it look like this:

class myComponent {

  constructor(
    private serviceA: ServiceA,
    private serviceB: ServiceB,
    private serviceC: ServiceC) {}

  ngOnInit() {
    const destroy = Observable.fromEvent(???).first();
    this.subscriptionA = this.serviceA.subscribe(...).takeUntil(destroy);
    this.subscriptionB = this.serviceB.subscribe(...).takeUntil(destroy);
    this.subscriptionC = this.serviceC.subscribe(...).takeUntil(destroy);
  }

}

Is there a destroy event or something similar that would let me use takeUntil() or another way to simplify the component architecture like that? I realize I could create an event myself in the constructor or something that gets triggered within ngOnDestroy() but that would in the end not make things that much simpler to read.

shammelburg
  • 6,974
  • 7
  • 26
  • 34
marius
  • 1,533
  • 3
  • 16
  • 22

9 Answers9

105

You could leverage a ReplaySubject for that:

EDIT: Different since RxJS 6.x: Note the use of the pipe() method.

class myComponent {
  private destroyed$: ReplaySubject<boolean> = new ReplaySubject(1);

  constructor(
    private serviceA: ServiceA,
    private serviceB: ServiceB,
    private serviceC: ServiceC) {}

  ngOnInit() {
    this.serviceA
      .pipe(takeUntil(this.destroyed$))
      .subscribe(...);
    this.serviceB
      .pipe(takeUntil(this.destroyed$))
      .subscribe(...);
    this.serviceC
      .pipe(takeUntil(this.destroyed$))
      .subscribe(...);
  }

  ngOnDestroy() {
    this.destroyed$.next(true);
    this.destroyed$.complete();
  }
}

This is only valid for RxJS 5.x and older:

class myComponentOld {
  private destroyed$: ReplaySubject<boolean> = new ReplaySubject(1);

  constructor(private serviceA: ServiceA) {}

  ngOnInit() {
    this.serviceA
      .takeUntil(this.destroyed$)
      .subscribe(...);
  }

  ngOnDestroy() {
    this.destroyed$.next(true);
    this.destroyed$.complete();
  }
}
Boyan Kushlev
  • 1,043
  • 1
  • 18
  • 35
olsn
  • 16,644
  • 6
  • 59
  • 65
  • 4
    In a way this is not what I wanted -- I wanted to avoid creating an extra state artifact in the component (`destroyed$`) and triggering it from `ngOnDestroy`. But I've come to realize after looking more that there is just no syntactic sugar to get around this. This is definitely already a nicer solution though than storing all the subscriptions. Thanks! – marius Mar 01 '17 at 14:32
  • 2
    There have been discussions in the angular team on how to make the destroy event easily accessible to rxjs in a component but as far as i kmow nothing has been implemented yet. – olsn Mar 01 '17 at 15:43
  • 6
    I would consider a `new ReplaySubject(1)` here. That way your component will stay in destroyed state and you're sure everything is completed. Other than that, nice answer :) – Dorus May 15 '17 at 21:40
  • 2
    @Dorus - What is the value of the Replay Subject over just a regular Subject here. As long as the subject is completed, why would you need the replay functionality? – EricJ Sep 14 '17 at 12:54
  • 21
    @EricJ The `replaySubject` will help to keep the component in an destroyed state even if you try to use any of the observable after `ngOnDestroy` has already been called. Any late subscriptions will instantly trigger the replayed value from the `replaySubject` and complete. – Dorus Sep 14 '17 at 14:22
  • Any Specific reason to use `replySubject` – Isac Feb 10 '23 at 11:08
  • 2
    @Isac the comment above yours explains this sufficiently, no? – QBrute Mar 03 '23 at 09:57
  • My bad, I totally skipped through the comments. – Isac Mar 07 '23 at 09:33
26

Using the componentDestroyed() function from the npm package @w11k/ngx-componentdestroyed is by far the easiest way to use takeUntil:

@Component({
  selector: 'foo',
  templateUrl: './foo.component.html'
})
export class FooComponent implements OnInit, OnDestroy {
  ngOnInit() {
    Observable.interval(1000)
      .takeUntil(componentDestroyed(this)) // <--- magic is here!
      .subscribe(console.log);
  }

  ngOnDestroy() {}
}

Here's a version of componentDestroyed() to include directly in your code:

// Based on https://www.npmjs.com/package/ng2-rx-componentdestroyed
import { OnDestroy } from '@angular/core';
import { ReplaySubject } from 'rxjs/ReplaySubject';

export function componentDestroyed(component: OnDestroy) {
  const oldNgOnDestroy = component.ngOnDestroy;
  const destroyed$ = new ReplaySubject<void>(1);
  component.ngOnDestroy = () => {
    oldNgOnDestroy.apply(component);
    destroyed$.next(undefined);
    destroyed$.complete();
  };
  return destroyed$;
}
Rene Hamburger
  • 2,003
  • 16
  • 17
15

Well, this comes down to what you mean by closing a subscription. There're basically two ways to do this:

  1. Using an operator that completes the chain (such as takeWhile()).
  2. Unsubscribe from the source Observable.

It's good to know that these two aren't the same.

When using for example takeWhile() you make the operator send complete notification which is propagated to your observers. So if you define:

...
.subscribe(..., ..., () => doWhatever());

Then when you complete the chain with eg. takeWhile() the doWhatever() function will be called.

For example it could look like this:

const Observable = Rx.Observable;
const Subject = Rx.Subject;

let source = Observable.timer(0, 1000);
let subject = new Subject();

source.takeUntil(subject).subscribe(null, null, () => console.log('complete 1'));
source.takeUntil(subject).subscribe(null, null, () => console.log('complete 2'));
source.takeUntil(subject).subscribe(null, null, () => console.log('complete 3'));

setTimeout(() => {
  subject.next();
}, 3000);

After 3s all the complete callbacks will be called.

On the other hand when you unsubscribe you're saying that you're no longer interested in the items produced by the source Observable. However this doesn't mean the source has to complete. You just don't care any more.

This means that you can collect all Subscriptions from .subscribe(...) calls and unsubscribe all of them at once:

let subscriptions = new Rx.Subscription();
let source = Observable.timer(0, 1000);

subscriptions.add(source.subscribe(null, null, () => console.log('complete 1')));
subscriptions.add(source.subscribe(null, null, () => console.log('complete 2')));
subscriptions.add(source.subscribe(null, null, () => console.log('complete 3')));

setTimeout(() => {
  subscriptions.unsubscribe();
}, 3000);

Now after 3s delay nothing will be printed to console because we unsubscribed and no complete callback was invoked.

So what you want to use is up to you and your use-case. Just be aware that unsubscribing is not the same as completing even though I guess in your situation it doesn't really matter.

martin
  • 93,354
  • 25
  • 191
  • 226
  • Maybe worth noting, that one should not start a stream, that must complete directly from within a component - any critical operation should be done through a service, which is not in danger of being destroyed through a route-change e.g. – olsn Feb 28 '17 at 08:35
  • I haven't actually encountered many completing streams so far in that context, because most are open-ended and the components just stop listening at some point. But I definitely think unsubscribing might long-term be the better pattern to apply here just out of principle, because that's what's supposed to logically happen. I'll think about it. Thanks! – marius Mar 01 '17 at 14:43
  • 1
    Consider `takeUntil(Rx.Observable.timer(3000))` on the stream. Indeed, with `takeUntil` you would `complete`, while with `unsubscribe` you *cancel*. – Dorus May 15 '17 at 21:43
6

Please Use Polymorphism with TakeUntil (April 13, 2022)

If you're writing protected destroy$ = new Subject<void>(); in every component that you make, then you should be asking yourself, "Why am I not following the DRY (Don’t Repeat Yourself) principle?"

To follow the DRY principle, create an abstract base component (abstract classes cannot be instantiated directly) that handles your destroy signal.

@Component({ template: '' })
export abstract class BaseComponent extends Subscribable {
  // Don't let the outside world trigger this destroy signal.
  // It's only meant to be trigger by the component when destroyed! 
  private _destroy = new Subject<void>();
  public destroy$ = this._destroy as Observable<void>;
  /** Lifecycle hook called by angular framework when extended class dies. */
  ngOnDestroy(): void {
    this._destroy.next();
  }
}

Make a handy extension function to simplify things.

declare module 'rxjs/internal/Observable' {
  interface Observable<T> {
    dieWith(comp: BaseComponent): Observable<T>;
  }
}

Observable.prototype.dieWith = function<T>(comp: BaseComponent): Observable<T> {
    return this.pipe(takeUntil(comp.destroy$));
};

Extend your BaseComponent whenever you need to handle subscriptions.

@Component({ ... })
export class myComponent extends BaseComponent {

  constructor(
    private serviceA: ServiceA,
    private serviceB: ServiceB,
    private serviceC: ServiceC
  ) {
    super();
  }

  ngOnInit() {
    this.serviceA.a$.dieWith(this).subscribe(...);
    this.serviceB.b$.dieWith(this).subscribe(...);
    this.serviceC.c$.dieWith(this).subscribe(...);
  }

}

You've officially handled subsciptions in Angular Components like a pro.

Your colleagues will thank you later!

Happy coding!

Kevin Baker
  • 569
  • 5
  • 8
4

If your component is directly tied to a route, you can avoid adding state by leveraging Router events with takeUntil(). That way, as soon as you navigate away from the component, it will clean up its subscriptions automatically for you.

import { Component, OnInit } from '@angular/core';
import { ActivatedRoute, Router } from '@angular/router';
import { MyService } from './my.service';
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/operator/takeUntil';

@Component({
    ...
})
export class ExampleComopnent implements OnInit {

    constructor(private router: Router, private myService: MyService) {
    }

    ngOnInit() {
        this.myService.methodA()
            .takeUntil(this.router.events)
            .subscribe(dataA => {
                ...
            });

        this.myService.methodB()
            .takeUntil(this.router.events)
            .subscribe(dataB => {
                ...
            });
    }
}

Note: This simple example does not take into account guarded routes or canceled route navigation. If there is a chance that one of the router events could be triggered but route navigation gets cancelled, you'll need to filter on the router events so it gets triggered at the appropriate point - for example, after the Route Guard check or once navigation is complete.

this.myService.methodA()
    .takeUntil(this.router.events.filter(e => e instanceOf NavigationEnd))
    .subscribe(dataA => {
        ...
    });
Matt
  • 988
  • 9
  • 14
4

Create a base class

import { Subject } from 'rxjs/Rx';
import { OnDestroy } from '@angular/core';

 export abstract class Base implements OnDestroy {

 protected componentDestroyed$: Subject<any>;

constructor() {
    this.componentDestroyed$ = new Subject<void>();

    const destroyFunc = this.ngOnDestroy;
    this.ngOnDestroy = () => {
        destroyFunc.bind(this)();
        this.componentDestroyed$.next();
        this.componentDestroyed$.complete();
    };
}
// placeholder of ngOnDestroy. no need to do super() call of extended class.
public ngOnDestroy() {
    // no-op
}

}

The component would be,

extends the Base class

export class Test extends Base{
}

while you subscribe

service.takeUntil(this.componentDestroyed$
    .subscribe(...)

This is a global level change, whenever you want to subscribe use the same approach throughout the project. In any changes required you can modify in the base class

murthy naika k
  • 559
  • 6
  • 12
  • Does this work? I put a `console.log` inside the arrow function the line before `this.componentDestroyed$.next();` and it never gets invoked. – mtpultz Jan 31 '21 at 04:06
4

Angular 16 provides a new takeUntilDestroyed function which can be used like so in the constructor

import { Component } from "@angular/core";
import { takeUntilDestroyed } from "@angular/core/rxjs-interop";

@Component({
  selector: "my-component",
  templateUrl: "./my-component.html",
  styleUrls: ["./my-component.scss"]
})
export class MyComponent {

  constructor(private http: HttpClient) {
     this.http.get('/api')
       .pipe(takeUntilDestroyed())
       .subscribe();
  }
}

NOTE if you are trying to do the same outside of the constructor you may see this error takeUntilDestroyed() can only be used within an injection context such as a constructor To fix this update to the following

import { Component, DestroyRef, OnInit, inject } from "@angular/core";
import { takeUntilDestroyed } from "@angular/core/rxjs-interop";

@Component({
  selector: "my-component",
  templateUrl: "./my-component.html",
  styleUrls: ["./my-component.scss"]
})
export class MyComponent implements OnInit {
  destroyedRef = inject(DestroyRef);

  ngOnInit(): void {
     this.http.get('/api')
       .pipe(takeUntilDestroyed(this.destroyedRef))
       .subscribe();
  }
}
el-davo
  • 223
  • 6
  • 10
2

It's May 2023 and the Angular team have implemented takeUntilDestroyed() to handle this scenario for Angular 16 which is imminent: https://github.com/angular/angular/search?q=takeUntilDestroyed

santos
  • 434
  • 3
  • 6
  • [docs link](https://angular.io/api/core/rxjs-interop/takeUntilDestroyed). `takeUntliDestroyed` is part of the [developer preview](https://angular.io/guide/releases#developer-preview), so I wouldn't use it just yet. this is definitely going to become the correct answer soon though. – jemand771 May 10 '23 at 12:58
1

Since Angular 16 there is a new operator that does the job of unsubscribing and releasing resources automatically: takeUntilDestoryed

Example:

import { takeUntilDestroyed } from 'angular/core/rxjs-interop';

@Component({...})
export class AppComponent implements OnInit {

  destroyRef = inject(DestroyRef);
  
  constructor(private http: HttpClient) {}

  public ngOnInit(): void {
    this.http.get('/api').pipe(takeUntilDestroyed(this.destroyRef)).subscribe();
  }
}

Docs: https://angular.io/api/core/rxjs-interop/takeUntilDestroyed

Vega
  • 27,856
  • 27
  • 95
  • 103
Andrei Fara
  • 174
  • 7
  • The example you gave will throw this error, hence the downvote `takeUntilDestroyed() can only be used within an injection context such as a constructor` – el-davo Jun 13 '23 at 09:57
  • Good point, I missed that. I injected DestroyRef which can also be used by passing the reference to the operator. – Andrei Fara Jun 16 '23 at 14:45