Sorry this is in RxJS instead of RxJava, but the concept will be the same. I adapted this from learn-rxjs.io and here it is on codepen.
The idea is that you start out with two streams of click events, startClick$
and stopClick$
. Each click occurring on the stopClick$
stream get mapped to an empty observable, and clicks on startClick$
each get mapped to the interval$
stream. The two resulting streams get merge
-d together into one observable-of-observables. In other words, a new observable of one of the two types will be emitted from merge
each time there's a click. The resulting observable will go through switchMap
, which starts listening to this new observable and stops listening to whatever it was listening to before. Switchmap will also start merge the values from this new observable onto its existing stream.
After the switch, scan
only ever sees the "increment" value emitted by interval$
, and it doesn't see any values when "stop" has been clicked.
And until the first click occurs, startWith
will start emitting values from $interval
, just to get things going:
const start = 0;
const increment = 1;
const delay = 1000;
const stopButton = document.getElementById('stop');
const startButton = document.getElementById('start');
const startClick$ = Rx.Observable.fromEvent(startButton, 'click');
const stopClick$ = Rx.Observable.fromEvent(stopButton, 'click');
const interval$ = Rx.Observable.interval(delay).mapTo(increment);
const setCounter = newValue => document.getElementById("counter").innerHTML = newValue;
setCounter(start);
const timer$ = Rx.Observable
// a "stop" click will emit an empty observable,
// and a "start" click will emit the interval$ observable.
// These two streams are merged into one observable.
.merge(stopClick$.mapTo(Rx.Observable.empty()),
startClick$.mapTo(interval$))
// until the first click occurs, merge will emit nothing, so
// use the interval$ to start the counter in the meantime
.startWith(interval$)
// whenever a new observable starts, stop listening to the previous
// one and start emitting values from the new one
.switchMap(val => val)
// add the increment emitted by the interval$ stream to the accumulator
.scan((acc, curr) => curr + acc, start)
// start the observable and send results to the DIV
.subscribe((x) => setCounter(x));
And here's the HTML
<html>
<body>
<div id="counter"></div>
<button id="start">
Start
</button>
<button id="stop">
Stop
</button>
</body>
</html>