43

This will emit a tick every 5 seconds.

Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
            .subscribe(tick -> Log.d(TAG, "tick = "+tick));

To stop it you can use

Schedulers.shutdown();

But then all the Schedulers stops and it is not possible to resume the ticking later. How can I stop and resume the emiting "gracefully"?

Jan Seevers
  • 768
  • 1
  • 11
  • 20
  • I think the best answer to this should be this one: http://stackoverflow.com/questions/35782767/how-can-an-observable-be-paused-without-losing-the-items-emitted?answertab=votes#tab-top, where Scan() is used, so there is not the need to accumulate a value externally (like suggested in the best answer) – Pedro Lopes Jan 13 '17 at 00:04

7 Answers7

44

Here's one possible solution:

class TickHandler {

    private AtomicLong lastTick = new AtomicLong(0L);
    private Subscription subscription;

    void resume() {
        System.out.println("resumed");
        subscription = Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
                                 .map(tick -> lastTick.getAndIncrement())
                                 .subscribe(tick -> System.out.println("tick = " + tick));
    }

    void stop() {
        if (subscription != null && !subscription.isUnsubscribed()) {
            System.out.println("stopped");
            subscription.unsubscribe();
        }
    }
}
AndroidEx
  • 15,524
  • 9
  • 54
  • 50
  • @MathijsSegers If I understood your question correctly, `Observable.interval` is a cold observable, so unsubscribing from it makes it stop its emissions. At the same time, you could probably rely on the implementation of `Subscription` (or `Disposable` in RxJava 2) to also throw away the reference to the observable on `unsubscribe()` (`dispose()`). Or you can null out the `subscription` reference and make it GC-eligible for sure, together with the observable stored in it. – AndroidEx Dec 01 '17 at 14:05
  • 1
    Curiosity question here. Is it important to declare `lastTick` as Atomic ? I thought all `volatile` and Atomic things were handled by Rx directly. – Dan Chaltiel May 08 '18 at 10:23
  • 2
    @DanChaltiel Generally speaking, when multiple threads write to a field values that depends on the previous value in the field, the guarantees provided by the `volatile` are not strong enough, and the access needs to be synchronized or arranged through the available Java concurrency APIs. More on the `volatile` http://tutorials.jenkov.com/java-concurrency/volatile.html#when-is-volatile-enough. – AndroidEx May 08 '18 at 13:37
  • So there is no pause in RxJava actually, just unsubscribe then subscribe new one again. – thecr0w May 31 '21 at 03:07
30

Some time ago, I was also looking for kind of RX "timer" solutions, but non of them met my expectations. So there you can find my own solution:

AtomicLong elapsedTime = new AtomicLong();
AtomicBoolean resumed = new AtomicBoolean();
AtomicBoolean stopped = new AtomicBoolean();

public Flowable<Long> startTimer() { //Create and starts timper
    resumed.set(true);
    stopped.set(false);
    return Flowable.interval(1, TimeUnit.SECONDS)
            .takeWhile(tick -> !stopped.get())
            .filter(tick -> resumed.get())
            .map(tick -> elapsedTime.addAndGet(1000));
}

public void pauseTimer() {
    resumed.set(false);
}

public void resumeTimer() {
    resumed.set(true);
}

public void stopTimer() {
    stopped.set(true);
}

public void addToTimer(int seconds) {
    elapsedTime.addAndGet(seconds * 1000);
}
Artur Szymański
  • 1,639
  • 1
  • 19
  • 22
  • 3
    I can't believe nobody has upticked this answer. The accepted answer creates a new Observable after every pause and doesn't accommodate final fields in the lambdas (You have to use class fields). This is a way better solution. – Myles Bennett Dec 14 '17 at 17:38
  • 2
    This is the only answer which is fine to use in production code. – Andrii Kovalchuk Mar 05 '18 at 14:16
  • 1
    I'm curious why a Flowable was chosen instead of an Observable. This requires the subscribing user to explicitly request values, right? – Robert Lewis Aug 06 '20 at 19:01
  • @RobertLewis No You don't have to request values. Flowable support backpressure by default, it was reason. – Artur Szymański Aug 10 '20 at 07:31
  • 1
    I'm confused. My understanding is that a `Flowable` does not emit anything until `request(n)` is called. Why not use an `Observable`? Please explain. – Robert Lewis Sep 01 '20 at 03:04
11
val switch = new java.util.concurrent.atomic.AtomicBoolean(true)
val tick = new java.util.concurrent.atomic.AtomicLong(0L)

val suspendableObservable = 
  Observable.
    interval(5 seconds).
    takeWhile(_ => switch.get()).
    repeat.
    map(_ => tick.incrementAndGet())

You can set switch to false to suspend the ticking and true to resume it.

Sheng
  • 1,697
  • 4
  • 19
  • 33
2

Sorry this is in RxJS instead of RxJava, but the concept will be the same. I adapted this from learn-rxjs.io and here it is on codepen.

The idea is that you start out with two streams of click events, startClick$ and stopClick$. Each click occurring on the stopClick$ stream get mapped to an empty observable, and clicks on startClick$ each get mapped to the interval$ stream. The two resulting streams get merge-d together into one observable-of-observables. In other words, a new observable of one of the two types will be emitted from merge each time there's a click. The resulting observable will go through switchMap, which starts listening to this new observable and stops listening to whatever it was listening to before. Switchmap will also start merge the values from this new observable onto its existing stream.

After the switch, scan only ever sees the "increment" value emitted by interval$, and it doesn't see any values when "stop" has been clicked.

And until the first click occurs, startWith will start emitting values from $interval, just to get things going:

const start = 0;
const increment = 1;
const delay = 1000;
const stopButton = document.getElementById('stop');
const startButton = document.getElementById('start');
const startClick$ = Rx.Observable.fromEvent(startButton, 'click');
const stopClick$ = Rx.Observable.fromEvent(stopButton, 'click');
const interval$ = Rx.Observable.interval(delay).mapTo(increment);
const setCounter = newValue => document.getElementById("counter").innerHTML = newValue;
setCounter(start);

const timer$ = Rx.Observable

    // a "stop" click will emit an empty observable,
    // and a "start" click will emit the interval$ observable.  
    // These two streams are merged into one observable.
    .merge(stopClick$.mapTo(Rx.Observable.empty()), 
           startClick$.mapTo(interval$))

    // until the first click occurs, merge will emit nothing, so 
    // use the interval$ to start the counter in the meantime
    .startWith(interval$)

    // whenever a new observable starts, stop listening to the previous
    // one and start emitting values from the new one
    .switchMap(val => val)

    // add the increment emitted by the interval$ stream to the accumulator
    .scan((acc, curr) => curr + acc, start)

    // start the observable and send results to the DIV
    .subscribe((x) => setCounter(x));

And here's the HTML

<html>
<body>
  <div id="counter"></div>
  <button id="start">
    Start
  </button>
  <button id="stop">
    Stop
  </button>
</body>
</html>
mikebridge
  • 4,209
  • 2
  • 40
  • 50
1

Here is a another way to do this, I think.
When you check the source code, you will find interval() using class OnSubscribeTimerPeriodically. The key code below.

@Override
public void call(final Subscriber<? super Long> child) {
    final Worker worker = scheduler.createWorker();
    child.add(worker);
    worker.schedulePeriodically(new Action0() {
        long counter;
        @Override
        public void call() {
            try {
                child.onNext(counter++);
            } catch (Throwable e) {
                try {
                    worker.unsubscribe();
                } finally {
                    Exceptions.throwOrReport(e, child);
                }
            }
        }

    }, initialDelay, period, unit);
}

So, you will see, if you wanna cannel the loop, what about throwing a new exception in onNext(). Example code below.

Observable.interval(1000, TimeUnit.MILLISECONDS)
            .subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    Log.i("abc", "onNext");
                    if (aLong == 5) throw new NullPointerException();
                }
            }, new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    Log.i("abc", "onError");
                }
            }, new Action0() {
                @Override
                public void call() {
                    Log.i("abc", "onCompleted");
                }
            });

Then you will see this:

08-08 11:10:46.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:47.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:48.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:49.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:50.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:51.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:51.018 28146-28181/net.bingyan.test I/abc: onError             
Chris Wong
  • 451
  • 3
  • 5
  • 4
    Exceptions should be used only for handling exceptional events not to implement logic! It creates unnecessary objects, it is computationally heavy and syntactically confusing – Luca S. Aug 01 '17 at 11:14
1

You can use takeWhile and loop until conditions is true

Observable.interval(1, TimeUnit.SECONDS)
        .takeWhile {
            Log.i(TAG, " time " + it)
            it != 30L
        }
        .subscribe(object : Observer<Long> {
            override fun onComplete() {
                Log.i(TAG, "onComplete " + format.format(System.currentTimeMillis()))
            }

            override fun onSubscribe(d: Disposable) {
                Log.i(TAG, "onSubscribe " + format.format(System.currentTimeMillis()))
            }

            override fun onNext(t: Long) {
                Log.i(TAG, "onNext " + format.format(System.currentTimeMillis()))
            }

            override fun onError(e: Throwable) {
                Log.i(TAG, "onError")
                e.printStackTrace()
            }

        });
Alier
  • 11
  • 2
0

@AndroidEx , that's a wonderful answer. I did it a bit differently:

private fun disposeTask() {
    if (disposeable != null && !disposeable.isDisposed)
      disposeable.dispose()
  }

 private fun runTask() {
    disposeable = Observable.interval(0, 30, TimeUnit.SECONDS)
.flatMap {
        apiCall.runTaskFromServer()
.map{

when(it){
is ResponseClass.Success ->{
keepRunningsaidTasks()
}
is ResponseClass.Failure ->{
disposeTask() //this will stop the task in instance of a network failure.
}
}

}