58

After creating an Observable like so

var source = Rx.Observable.create(function(observer) {...});

What is the difference between subscribe

source.subscribe(function(x) {});

and forEach

source.forEach(function(x) {});
Kevin Le - Khnle
  • 10,579
  • 11
  • 54
  • 80

2 Answers2

67

In the ES7 spec, which RxJS 5.0 follows (but RxJS 4.0 does not), the two are NOT the same.

subscribe

public subscribe(observerOrNext: Observer | Function, error: Function, complete: Function): Subscription

Observable.subscribe is where you will do most of your true Observable handling. It returns a subscription token, which you can use to cancel your subscription. This is important when you do not know the duration of the events/sequence you have subscribed to, or if you may need to stop listening before a known duration.

forEach

public forEach(next: Function, PromiseCtor?: PromiseConstructor): Promise

Observable.forEach returns a promise that will either resolve or reject when the Observable completes or errors. It is intended to clarify situations where you are processing an observable sequence of bounded/finite duration in a more 'synchronous' manner, such as collating all the incoming values and then presenting once, by handling the promise.

Effectively, you can act on each value, as well as error and completion events either way. So the most significant functional difference is the inability to cancel a promise.

shannon
  • 8,664
  • 5
  • 44
  • 74
  • 7
    "In the ES7 spec, which RxJS 5.0 follows (but RxJS 4.0 does not), the two are NOT the same." It's very important to notice that specifications change over time. Which is very annoying, so something that it's correct now, maybe it wasn't in the past or won't be in the future. So the correct answer has always to specify for which version of what (and this one does). – pearpages Feb 01 '17 at 10:22
  • With forEach do we need to unsubscribe or does it handles by itself? – Samiullah Khan Jan 13 '19 at 08:08
  • 3
    @SamiullahKhan : Your question requires more complexity than yes/no to answer. `forEach` subscribes internally, and then returns a promise; with that promise you have no way to unsubscribe. However, you also have no way to cancel, so you probably assume it will complete or fail without your intervention. The conventional design expectation of observable libraries is that `Unsubscribe` happens automatically upon either `Complete` or `Error`. Therefore, if `forEach` conceptually works for you, then no, you don't need to unsubscribe. `forEach` does not `Unsubscribe` but `Complete` and `Error` do. – shannon Jan 13 '19 at 17:10
  • So, as I understand, `forEach` is useful for something like `await obs.forEach(doSomething)`, where your method will finish after the observable completes. On the other hand, when the lifetime of the observer may be shorter than the one of the observable, you'll use `.subscribe()` with the corresponding `.unsubscribe()`. – pmoleri Jan 20 '21 at 19:00
4

I just review the latest code available, technically the code of foreach is actually calling subscribe in RxScala, RxJS, and RxJava. It doesn't seems a big different. They now have a return type allowing user to have an way for stopping a subscription or similar.

When I work on the RxJava earlier version, the subscribe has a subscription return, and forEach is just a void. Which you may see some different answer due to the changes.

/**
 * Subscribes to the [[Observable]] and receives notifications for each element.
 *
 * Alias to `subscribe(T => Unit)`.
 *
 * $noDefaultScheduler
 *  
 * @param onNext function to execute for each item.
 * @throws java.lang.IllegalArgumentException if `onNext` is null
 * @throws rx.exceptions.OnErrorNotImplementedException if the [[Observable]] tries to call `onError`
 * @since 0.19
 * @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
 */
def foreach(onNext: T => Unit): Unit = {
    asJavaObservable.subscribe(onNext)
 }

def subscribe(onNext: T => Unit): Subscription = {
    asJavaObservable.subscribe(scalaFunction1ProducingUnitToAction1(onNext))
}

/**
 *  Subscribes an o to the observable sequence.
 *  @param {Mixed} [oOrOnNext] The object that is to receive notifications or an action to invoke for each element in the observable sequence.
 *  @param {Function} [onError] Action to invoke upon exceptional termination of the observable sequence.
 *  @param {Function} [onCompleted] Action to invoke upon graceful termination of the observable sequence.
 *  @returns {Disposable} A disposable handling the subscriptions and unsubscriptions.
 */
observableProto.subscribe = observableProto.forEach = function (oOrOnNext, onError, onCompleted) {
  return this._subscribe(typeof oOrOnNext === 'object' ?
    oOrOnNext :
    observerCreate(oOrOnNext, onError, onCompleted));
};

/**
 * Subscribes to the {@link Observable} and receives notifications for each element.
 * <p>
 * Alias to {@link #subscribe(Action1)}
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code forEach} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * 
 * @param onNext
 *            {@link Action1} to execute for each item.
 * @throws IllegalArgumentException
 *             if {@code onNext} is null
 * @throws OnErrorNotImplementedException
 *             if the Observable calls {@code onError}
 * @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
 */
public final void forEach(final Action1<? super T> onNext) {
    subscribe(onNext);
}

public final Disposable forEach(Consumer<? super T> onNext) {
    return subscribe(onNext);
}