27

Is it possible to get all the "active" subscriptions without storing them manually?

I'd like to unsubscribe all of the "active" subscriptions and don't want to reference each of them in an array or a variable.

Dave Newton
  • 158,873
  • 26
  • 254
  • 302
Nate
  • 7,606
  • 23
  • 72
  • 124
  • 1
    Can you just complete the observable? Usually if I want to unsubscribe all subscribers than I am done with the observable. This may not be true for you. Completing the observable will close all of your subscriptions. – bygrace Oct 31 '17 at 18:14
  • @bygrace I could complete all the observables but the goal is to avoid that. Some observables can take a very long time and I'd like to optimize that. – Nate Oct 31 '17 at 18:29
  • Would it be possible to expand on the motivation please? – Richard Matsen Oct 31 '17 at 20:51
  • @RichardMatsen I need to change component (angular 2) and would like to make sure nothing is pending. I can save each of my subscriptions in an array but I wanted to find an easier and cleaner way to unsubscribe them all before I switch view – Nate Oct 31 '17 at 20:59

7 Answers7

13

I depends on whether you're using a Subject or an Observable but there's probably no way to do this "automatically".

Observables

I don't think you can have such thing as "subscribed Observable" because you either store an Observable or Subscription:

const source = Observable.of(...)
  .map(...);

const subscription = source
  .subscribe();

Here source represents an Observable and subscription represents a single subscription.

Note that you can have a Subscription instance that stores multiple other subscriptions:

const subscriptions = new Subscription();

const sub1 = Observable...subscribe();
const sub2 = Observable...subscribe();
const sub3 = Observable...subscribe();

subscriptions.add(sub1).add(sub2).add(sub3);

// Then unsubscribe all of them with a single 
subscriptions.unsubscribe();

Subjects

If you're using Subjects they do have the unsubscribe method themselves, see https://github.com/ReactiveX/rxjs/blob/master/src/Subject.ts#L96.

However be aware that this makes the Subject "stopped", for more info see https://medium.com/@martin.sikora/rxjs-subjects-and-their-internal-state-7cfdee905156

martin
  • 93,354
  • 25
  • 191
  • 226
  • Your method implies adding "manually" each observable in an array...which is what I want to avoid. – Nate Oct 31 '17 at 18:31
  • I tried calling `unsubscribe` on a Subject with two subscriptions, and it threw a `ObjectUnsubscribedError`. – Richard Matsen Oct 31 '17 at 20:13
  • Ref this question [ObjectUnsubscribedError when trying to prevent subscribing twice](https://stackoverflow.com/questions/38081878/objectunsubscribederror-when-trying-to-prevent-subscribing-twice) basically indicates you can't use Subject.unsubscribe() for this purpose, but no explaination why. – Richard Matsen Oct 31 '17 at 20:22
  • Can you make a demo? The `unsubscribe()` method itself doesn't throw anything – martin Oct 31 '17 at 20:26
  • You can however unsubscribe Subject.observers without penalty. It's a bit tricky as each unsubscribe alters the observers array (need to use `while(subject.observers.length)`. **BUT** I'd have my doubts about using this approach. – Richard Matsen Oct 31 '17 at 20:32
9

Yeap. Just call .observers property if you is using Subject object of the rxjs package.

Hope this helps.

rplaurindo
  • 1,277
  • 14
  • 23
6

One of the options is to use takeUntil in combination with Subject to stop all (Observable) subscriptions.

terminator$: Subject<boolean> = new Subject();

Observable.timer(1000)
  .do(i => console.log(`timer 1: ${i}`))
  .takeUntil(terminator$)
  .subscribe();

Observable.timer(5000)
  .do(i => console.log(`timer 2: ${i}`))
  .takeUntil(terminator$)
  .subscribe();

const stopSubscriptions = () => terminator$.next(true);
2

I think the basic problem is that an Observable (with exception of Subject and derivatives) does not keep a reference to it's observers.

Without built-in references, you need to handle them externally in some form.

I think the best you could achieve is to create a reusable subscription 'override' to wrap the mechanism, although I doubt it's worth it.

import { Observable } from 'rxjs/Observable';
import { Subscription } from 'rxjs/Subscription';

const subscribeAndGuard = function(component, fnData, fnError = null, fnComplete = null) {

  // Define the subscription
  const sub: Subscription = this.subscribe(fnData, fnError, fnComplete);

  // Wrap component's onDestroy
  if (!component.ngOnDestroy) {
    throw new Error('To use subscribeAndGuard, the component must implement ngOnDestroy');
  }
  const saved_OnDestroy = component.ngOnDestroy;
  component.ngOnDestroy = () => {
    console.log('subscribeAndGuard.onDestroy');
    sub.unsubscribe();
    // Note: need to put original back in place
    // otherwise 'this' is undefined in component.ngOnDestroy
    component.ngOnDestroy = saved_OnDestroy;
    component.ngOnDestroy();

  };

  return sub;
};

// Create an Observable extension
Observable.prototype.subscribeAndGuard = subscribeAndGuard;

// Ref: https://www.typescriptlang.org/docs/handbook/declaration-merging.html
declare module 'rxjs/Observable' {
  interface Observable<T> {
    subscribeAndGuard: typeof subscribeAndGuard;
  }
}

Ref this question Angular/RxJs When should I unsubscribe from Subscription

Richard Matsen
  • 20,671
  • 3
  • 43
  • 77
2

You can just store one Subscription, and add to it.

private _subscriptions = new Subscription();   // yes you can do this!

Then add to it

_subscriptions.add(
    this.layoutManager.width$.subscribe((w) => {

        // any nested subscriptions can also use this mechanism
        _subscriptions.add(...);
    });

Then in ngOnDestroy() you just call _subscriptions.unsubscribe();.

I've been trying to make a 'clever' way to do this, but to be frank I've overengineered it quite a bit (including logging) and this is the simplest way I've come across.

Simon_Weaver
  • 140,023
  • 84
  • 646
  • 689
  • Well if you ever come across anything please update b/c really the 1 thing i have yet to find is how to unsub before subbing again, in a "clever" way for individual methods that need this functionality without an extra variable to unsub (if exists)... – Don Thomas Boyle Feb 15 '19 at 15:27
  • I’m not sure what you’re looking for. Unless you’re talking about an async function (that contains await) then you’d be defeating the purpose of RxJs by unsubscribing from an observable at the end of a method. If you’re doing take(1) and trying to get an immediate value for something then you don’t need to unsubscribe anyway. I use a helper service which has the benefit of calling ngOndestroy automatically when the component is destroyed, but it comes with more added complications. – Simon_Weaver Feb 15 '19 at 20:59
  • Here's what i'm talking about - ended up just extending the functionality. The wiki and stackblitz should explain :) thanks for being interested! https://github.com/dontboyle/rxjs-serial-subscription/wiki – Don Thomas Boyle Feb 16 '19 at 19:25
1

You can inherit a Base component that implements the OnDestroy interface and keep an array to store all the Subscription and finally on destroy you can find the object with the instanceof Subscription and unsubscribe all. Let me write the code below.

  1. Create a Base class that implements OnInit, ODestroy
export class BaseComponent implements OnInit, OnDestroy {

  subscriptions: Subscription[] = [];

  ngOnInit(): void {
  }

  ngOnDestroy(): void {
    console.log('OnDestory - Baselass');
    this.subscribers.forEach(sub => {
      sub.unsubscribe();
      console.log('Unsub');
    })
  }


   addObserver(subscription: Subscription) {
      this.subscribers.push(subscription);
   }
}
  1. Create a component and inherit the Base class
export class SomeComponent extends BaseComponent implements OnInit, OnDestroy {

    constructor(private someService: SomeService) {
      super();
    }

    ngOnInit(): void {
      super.ngOnInit();
    }

    ngOnDestroy(): void {
      super.ngOnDestroy()
    }

    private initObservers() {
      super.addObserver(this.someService.someObservable$.subscribe(res => {

      }));
    }

    
}

That's it, You don't now need to worry about unsubscribing a Subscription manually, BaseComponent with taking care of that.

Important point:- Pass the subscription to superclass is necessary, which keeps the list and unsubscribe all Subscriptions on component destroy lifecycle event.

super.addObserver(this.someService.someObservable$.subscribe(res => {

}));

Thanks!!

Nandhakumar Appusamy
  • 1,156
  • 13
  • 22
0

The easiest way is to make use of ngx-auto-unsubscribe. With this decorator you shouldn't put the unsubscribe(), this is done automatically.

For use this you only need declare onDestroy, you don't need to put anything in this.

It is very easy to use, I encourage you to use it.

Npm installation

Official documentation