39

Under the subjects package you have classes like PublishSubject and BehaviorSubject which I suppose can be described as some usable sample Observables.

How can these subjects be unsubscribed from? There is no unsubscribe method and calling onCompleted ends the Observable altogether right?

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
arinte
  • 3,660
  • 10
  • 45
  • 65

3 Answers3

72

A Subject is an Observable and an Observer at the same time, it can be unsubscribed from just like normal observables. What makes subject special is that it is sort of bridge between observables and observers. It can pass through the items it observes by reemitting them, and it can also emit new items. Subjects are to observables as promises are to futures.

Here is a brief description of the subjects family:

AsyncSubject: only emits the last value of the source Observable

BehaviorSubject: emits the most recently emitted item and all the subsequent items of the source Observable when a observer subscribe to it.

PublishSubject: emits all the subsequent items of the source Observable at the time of the subscription.

ReplaySubject: emits all the items of the source Observable, regardless of when the subscriber subscribes.

the official doc comes with some nice marble diagrams which makes it more easier to understand

Misha Akopov
  • 12,241
  • 27
  • 68
  • 82
Septem
  • 3,614
  • 1
  • 20
  • 20
  • 2
    Is BehaviorSubject essentially the same as ReplaySubject(1)? – Dzmitry Lazerka Dec 11 '15 at 10:00
  • 2
    @DzmitryLazerka: BehaviorSubject always has an initial value. If you are looking for BehaviorSubject without initial value then use ReplaySubject(1). – gregschlom Jun 03 '16 at 22:23
  • Great answer though slight correction - ReplaySubject can also specify a buffer in the create(), which will be the number of items to emit on subscription – jklp Mar 31 '17 at 03:09
30

Subjects are essentially are both Observables and Observers.

An Observable is essentially a thing that has a function that takes an Observer and returns a subscription. So, for example, given simple observable:

    Observable<Integer> observable = Observable.create(new Observable.OnSubscribeFunc<Integer>() {
        @Override
        public Subscription onSubscribe(Observer<? super Integer> observer) {
            observer.onNext(3);
            observer.onNext(2);
            observer.onNext(1);
            observer.onCompleted();

            return Subscriptions.empty();
        }
    });

And here we would subscribe to it, to print out a line for each integer:

    Subscription sub = observable.subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            System.out.println(integer);
        }
    });

You would unsubscribe on the above by calling sub.unsubscribe().

Here is a PublishSubject that does roughly the same thing:

    PublishSubject<Integer> publishSubject = PublishSubject.create();
    Subscription subscription = publishSubject.subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            System.out.println(integer);
        }
    });

    publishSubject.onNext(3);
    publishSubject.onNext(2);
    publishSubject.onNext(1);

You would unsubscribe from it the same way, by calling subscription.unsubscribe().

martiansnoop
  • 2,316
  • 2
  • 16
  • 16
  • could you please describe how do we make subscribers handle onNext in parallel? if there were several subscribers not just one how would we make them execute in parallel – vach Feb 01 '15 at 10:20
  • 3
    @vach - as per the above code you could make subscribers handle onNext in parallel by doing something like below: `Subscription subscription1 = publishSubject.observeOn(Schedulers.newThread()).subscribe(...);` and `Subscription subscription2 = publishSubject.observeOn(Schedulers.newThread()).subscribe(...);` – Praveer Gupta Sep 29 '15 at 05:42
  • I always wondered how to do this and I have not understood it until now, thx @praveer09 :) – Diego Palomar Oct 05 '15 at 14:22
6

All Subjects extend Observable which you can subscribe to using any of the multiple subscribe(...) methods. Call to any of the subscribe(...) methods returns a Subscription.

Subscription subscription = anySubject.subscribe(...);

Use this subscription instance's unsubscribe() method when you want to stop listening to the events from the Subject.

subscription.unsubscribe();
Praveer Gupta
  • 3,940
  • 2
  • 19
  • 21