15

I have a very simple timeInterval observable and I want to start/stop transmission without disconnecting subscribers (which should sit and wait regardless of observable status). Is possible, and if so how?

var source = Rx.Observable
  .interval(500)
  .timeInterval()
  .map(function (x) { return x.value + ':' + x.interval; })
  .take(10);

  var subscription = source.subscribe(
  function (x) {
     $("#result").append('Next: ' + x + ' ');
  },
  function (err) {
    $("#result").append('Error: ' + err);
  },
  function () {
    $("#result").append('Completed');
  });

general comment: most of the examples ive seen show how to define observables and subscribers. how do i affect the behavior of existing objects?

Laurence Fass
  • 1,614
  • 3
  • 21
  • 43

2 Answers2

17

Depends on what is the source of the stop/resume signal. The simplest way I can think about is with the pausable operator, which as the documentation says works better with hot observables. So in the following sample code, I removed the take(10) (your pausable signal now comes through the pauser subject), and added share to turn your observable into a hot one.

var pauser = new Rx.Subject();
var source = Rx.Observable
  .interval(500)
  .timeInterval()
  .map(function (x) { return x.value + ':' + x.interval; })
  .share()
  .pausable(pauser);

var subscription = source.subscribe(
  function (x) {
     $("#result").append('Next: ' + x + ' ');
  },
  function (err) {
    $("#result").append('Error: ' + err);
  },
  function () {
    $("#result").append('Completed');
});

  // To begin the flow
pauser.onNext(true); // or source.resume();

// To pause the flow at any point
pauser.onNext(false);  // or source.pause();

Here is a more sophisticated example which will pause your source every 10 items:

// Helper functions
function emits ( who, who_ ) {return function ( x ) {
 who.innerHTML = [who.innerHTML, who_ + " emits " + JSON.stringify(x)].join("\n");
};}

var pauser = new Rx.Subject();
var source = Rx.Observable
  .interval(500)
  .timeInterval()
  .map(function (x) { return x.value + ':' + x.interval; })
  .share();
var pausableSource = source
  .pausable(pauser);

source
  .scan(function (acc, _){return acc+1}, 0)
  .map(function(counter){return !!(parseInt(counter/10) % 2)})
  .do(emits(ta_validation, 'scan'))
  .subscribe(pauser);

var subscription = pausableSource.subscribe(
  function (x) {
     $("#ta_result").append('Next: ' + x + ' ');
  },
  function (err) {
    $("#ta_result").append('Error: ' + err);
  },
  function () {
    $("#ta_result").append('Completed');
});

You should have by now your answer to the second question. Combine the observables you are given with the relevant RxJS operators to realize your use case. This is what I did here.

Community
  • 1
  • 1
user3743222
  • 18,345
  • 5
  • 69
  • 75
  • 1
    thanks for your reply. ive added a button to pause/play and i notice that the stream is resetting on every resume so its not doing exactly what i need which is a freeze/resume without restart. – Laurence Fass Jan 27 '16 at 17:05
  • Do you know the difference between a hot and a cold observable. Have a look here : http://stackoverflow.com/questions/32190445/hot-and-cold-observables-are-there-hot-and-cold-operators/34669444#34669444. If your stream is resetting, it probably means that you are using a cold observable i.e. an observable which is 'reset' every time it had a new subscriber. The only way to make sure is to see your code. In my example, I added a `.share()` to make sure the stream was hot before I passed it to the `pausable` operator. Otherwise I would have the same problems you have now. – user3743222 Jan 27 '16 at 21:07
0

not the most elegant, but probably the simplest:

  timeSubscription: Subscription
  timer: Observable<number>;
  time = 0;

toggle() {
if (!this.timer)
  this.timer = interval(500);

if (!this.timeSubscription || this.timeSubscription.closed)
  this.timeSubscription = this.timer.subscribe(tick => { // running
    console.log(this.time++);
  });
else
  this.timeSubscription.unsubscribe(); // not running 
}
EladTal
  • 2,167
  • 1
  • 18
  • 10