Depends on what is the source of the stop/resume signal. The simplest way I can think about is with the pausable
operator, which as the documentation says works better with hot observables. So in the following sample code, I removed the take(10)
(your pausable signal now comes through the pauser
subject), and added share
to turn your observable into a hot one.
var pauser = new Rx.Subject();
var source = Rx.Observable
.interval(500)
.timeInterval()
.map(function (x) { return x.value + ':' + x.interval; })
.share()
.pausable(pauser);
var subscription = source.subscribe(
function (x) {
$("#result").append('Next: ' + x + ' ');
},
function (err) {
$("#result").append('Error: ' + err);
},
function () {
$("#result").append('Completed');
});
// To begin the flow
pauser.onNext(true); // or source.resume();
// To pause the flow at any point
pauser.onNext(false); // or source.pause();
Here is a more sophisticated example which will pause your source every 10 items:
// Helper functions
function emits ( who, who_ ) {return function ( x ) {
who.innerHTML = [who.innerHTML, who_ + " emits " + JSON.stringify(x)].join("\n");
};}
var pauser = new Rx.Subject();
var source = Rx.Observable
.interval(500)
.timeInterval()
.map(function (x) { return x.value + ':' + x.interval; })
.share();
var pausableSource = source
.pausable(pauser);
source
.scan(function (acc, _){return acc+1}, 0)
.map(function(counter){return !!(parseInt(counter/10) % 2)})
.do(emits(ta_validation, 'scan'))
.subscribe(pauser);
var subscription = pausableSource.subscribe(
function (x) {
$("#ta_result").append('Next: ' + x + ' ');
},
function (err) {
$("#ta_result").append('Error: ' + err);
},
function () {
$("#ta_result").append('Completed');
});
You should have by now your answer to the second question. Combine the observables you are given with the relevant RxJS operators to realize your use case. This is what I did here.