11

I have the following Hot Observable:

hotObservable = Observable.interval(0L, 1L, TimeUnit.SECONDS)
                          .map((t) -> getCurrentTimeInMillis()))

However, I can't find a good way to stop it. I was able to partially solve this using takeWhile and a boolean flag (runTimer):

Observable.interval(0L, 1L, TimeUnit.SECONDS)
          .takeWhile((t) -> runTimer)
          .map((t) -> getCurrentTimeInMillis()))

There are 2 things I don't like in this approach though:

  1. I must keep the flag runTimer around, which I don't want.
  2. Once runTimer becomes false, the Observable simply completes, which means if I want to emit again I need to create a new Observable. I don't want that. I just want the Observable stop emitting items until I tell it to start again.

I was hoping for something like this:

hotObservable.stop();
hotObservable.resume();

That way I don't need to keep any flags around and the observable is always alive (it might not be emitting events though).

How can I achieve this?

  • Should the observable stop *producing* items, or continue operating but stop *emitting* them? – npace Dec 10 '16 at 00:07
  • @npace Stop producing would be ideal, but I'd be happy with stop emitting only as well. Both works in my situation. – Bitcoin Cash - ADA enthusiast Dec 10 '16 at 00:11
  • 2
    Kiskae's answer is perfectly valid, but just FYI.. if you would replace `takeWhile` operator in your previous solution with `filter` operator, you would get rid of the second disadvantage... – koperko Dec 12 '16 at 13:43

2 Answers2

9

One possible approach uses a BehaviorSubject and a switchMap:

BehaviorSubject<Boolean> subject = BehaviorSubject.create(true);
hotObservable = subject.distinctUntilChanged().switchMap((on) -> {
    if (on) {
        return Observable.interval(0L, 1L, TimeUnit.SECONDS);
    } else {
        return Observable.never();
    }
}).map((t) -> getCurrentTimeInMillis());

By sending booleans to the subject the output of the observable can be controlled. subject.onNext(true) will cause any observable created using that subject to begin emitting values. subject.onNext(false) disables that flow.

The switchMap takes care of disposing the underlying observables when it is switched off. It also uses distinctUntilChanged to make sure it does not do unnecessary switching.

Kiskae
  • 24,655
  • 2
  • 77
  • 74
1

Mybe you can use this

Observable.interval(3, TimeUnit.SECONDS)
        .takeUntil(stop)
        .subscribe(new MyObserver());

Thread.sleep(10000);
stop.onNext(-1);
bitristan
  • 631
  • 8
  • 16