28

Let's say we have an Observable:

var observable = Rx.Observable
    .fromEvent(document.getElementById('emitter'), 'click');

How can I make it Complete (what will trigger onComplete event for all subscribed Observers) ?

ulfryk
  • 683
  • 1
  • 6
  • 15

4 Answers4

41

In this present form, you cannot. Your observable is derived from a source which does not complete so it cannot itself complete. What you can do is extend this source with a completing condition. This would work like :

var end$ = new Rx.Subject();
var observable = Rx.Observable
    .fromEvent(document.getElementById('emitter'), 'click')
    .takeUntil(end$);

When you want to end observable, you do end$.onNext("anything you want here");. That is in the case the ending event is generated by you. If this is another source generating that event (keypress, etc.) then you can directly put an observable derived from that source as an argument of takeUntil.

Documentation:

user3743222
  • 18,345
  • 5
  • 69
  • 75
  • This seems to be potential danger of memory leaks in browser app. It seems that there can be infinite number of Observers connected with HTML Elements already removed from scope. It may prevent GC from removing those elements, and many other refs from memory… – ulfryk Dec 04 '15 at 21:04
  • 4
    ?? I don't follow you. What is 'this'? In any case, the `onCompleted` handler will be triggered for all observers, which is what you asked. About memory leaks, when the last observer has unsubscribed from the observable created by `fromEvent`, the event listener that was created is removed. – user3743222 Dec 04 '15 at 21:14
  • "what will trigger onComplete event for all subscribed Observers" is second additional part of my question. What I need to achieve is removal of event listener. Full completion of Observable so I can remove all obsolete references when I do not listen for event anymore. – ulfryk Dec 04 '15 at 21:17
  • This is exactly what should happen. Try the code and see if it works first. – user3743222 Dec 04 '15 at 21:18
  • 1
    > About memory leaks, when the last observer has unsubscribed from the observable created by `fromEvent`, the event listener that was created is removed. Oh - that is what makes your answer truly helpful. Thx ;) – ulfryk Dec 04 '15 at 21:18
  • 1
    Is this still an appropriate approach to this problem? I'm trying to devise a way to create a cancel token to cancel multiple observables on a "cancel" method evocation. – Con Antonakos Mar 30 '17 at 23:41
  • post a question on SO with as much detail as possible – user3743222 Mar 31 '17 at 00:41
7

What worked for me is using the take() operator. It will fire the complete callback after x number of events. So by passing 1, it will complete after the first event.

Typescript:

private preloadImage(url: string): Observable<Event> {
    let img = new Image();
    let imageSource = Observable.fromEvent(img, "load");

    img.src = url;

    return imageSource.take(1);
}
Tomas Klingen
  • 141
  • 1
  • 5
2

I think what you are looking for is the dispose() method.

from: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/creating.md#cold-vs-hot-observables

Notice that the subscribe method returns a Disposable, so that you can unsubscribe to a sequence and dispose of it easily. When you invoke the dispose method on the observable sequence, the observer will stop listening to the observable for data. Normally, you do not need to explicitly call dispose unless you need to unsubscribe early, or when the source observable sequence has a longer life span than the observer. Subscriptions in Rx are designed for fire-and-forget scenarios without the usage of a finalizer. Note that the default behavior of the Observable operators is to dispose of the subscription as soon as possible (i.e, when an onCompleted or onError messages is published). For example, the code will subscribe x to both sequences a and b. If a throws an error, x will immediately be unsubscribed from b.

  • 4
    if the question asked: how to unsubscribe an observable of a click listener then this answer would apply. But the question asked how to 'complete' an observable$ -- according to this https://stackoverflow.com/a/52198997/3136861 by @dK- `Calling unsubscribe() itself does not call complete method` – apollo Jul 07 '19 at 03:31
  • Yeah, it is needed to finish an observable stream itself, but not just unsubscribe from the observable stream. – ame Nov 06 '20 at 12:08
-1

I found an easier way to do this for my use case, If you want to do something when the observable is complete then you can use this:

const subscription$ = interval(1000).pipe(
  finalize(() => console.log("Do Something")),
).subscribe();

The finalize is triggered on complete, when all subscriptions are unsubscribed etc.

All2Pie
  • 310
  • 3
  • 13
  • This only completes the observable if it is configured to complete when its refCount falls under 1. This is not the same as explicitly completing the observable. – Mark Clark May 14 '21 at 19:27