A couple of things from the source code,
(do) first() and take(1) actually unsubscribe when they complete
It would appear so.
take.ts
protected _next(value: T): void {
const total = this.total;
const count = ++this.count;
if (count <= total) {
this.destination.next(value);
if (count === total) {
this.destination.complete();
this.unsubscribe();
}
}
}
To say an observable completes, does that also mean the subscriptions are unsubscribed?
In Subscription.ts, came across this (not seen it in documentation)
/**
* Adds a tear down to be called during the unsubscribe() of this
* Subscription.
*
...
*/
add(teardown: TeardownLogic): Subscription {
so I figured teardown could be used to verify that unsubscribe is called.
const source1 = Observable.range(1, 10).take(6)
const subscription1 = source1.subscribe(x => console.log('subscription1'))
.add(() => console.log('teardown1'))
// Emits 6x then 'teardown1'
const subscription2 = source1.take(4).subscribe(x => console.log('subscription2'))
.add(() => console.log('teardown2'))
// Emits 4x then 'teardown2'
but note that take()
only unsubscribes downstream of itself, not all subscribers to the observable
const source2 = Observable.range(1, 10)
const subscription3 = source2.subscribe(x => console.log('subscription3'))
.add(() => console.log('teardown3'))
// Emits 10x then 'teardown3'
const subscription4 = source2.take(5).subscribe(x => console.log('subscription4'))
.add(() => console.log('teardown4'))
// Emits 5x then 'teardown4'
I need to know that the observable isn't retaining any references after first() or take(1) complete.
This is a bit trickier, here is the Observable.subscribe()
method. Seems the observer isn't keeping a reference to any of it's three parameters, but rather the subscription is keeping a reference to the observable.
subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
error?: (error: any) => void,
complete?: () => void): Subscription {
const { operator } = this;
const sink = toSubscriber(observerOrNext, error, complete);
if (operator) {
operator.call(sink, this.source);
} else {
sink.add(this.source ? this._subscribe(sink) : this._trySubscribe(sink));
}
if (sink.syncErrorThrowable) {
sink.syncErrorThrowable = false;
if (sink.syncErrorThrown) {
throw sink.syncErrorValue;
}
}
return sink;
}
This can be seen in the following test code. While the subscription is active (closed: false), the subscriber has the observable referenced in _subscriptions
const source3 = Observable.interval(1000)
const subscription5 = source3.subscribe(x => {})
console.log(source3)
console.log(subscription5)
Console output:
IntervalObservable
period: 1000
scheduler: AsyncScheduler {...}
_isScalar: false
__proto__: Observable
Subscriber
closed: false
destination: SafeSubscriber {...}
isStopped: false
syncErrorThrowable: false
syncErrorThrown: false
syncErrorValue: null
_parent: null
_parents: null
_subscriptions: [AsyncAction]
__proto__: Subscription
but when we close the subscription with a take(1)
,
const source3 = Observable.interval(1000).take(1)
const subscription5 = source3.subscribe(x => {})
console.log(source3)
console.log(subscription5)
Subscriber _subscriptions
is set to null, releasing the reference.
Subscriber
closed: true
destination: SafeSubscriber {...}
isStopped: true
syncErrorThrowable: false
syncErrorThrown: false
syncErrorValue: null
_parent: null
_parents: null
_subscriptions: null
__proto__: Subscription
I'm not sure this can be considered a definitive proof for all observable/operator/subscriber chains, but is at least indicative of a way to verify your particular use-case.