An example of the code without the use of subjects, unit tested with Replay semantics and transient subscribers. Runs on node with node-unit
(windows cmds)
npm install rx
npm install node-unit
.\node_modules\.bin\nodeunit.cmd tests
and the code in the test
directory.
var Rx = require('rx')
var onNext = Rx.ReactiveTest.onNext,
onError = Rx.ReactiveTest.onError,
onCompleted = Rx.ReactiveTest.onCompleted,
subscribe = Rx.ReactiveTest.subscribe;
exports.testingReplayWithTransientSubscribers = function(test){
//Declare that we expect to have 3 asserts enforced.
test.expect(3);
//Control time with a test scheduler
var scheduler = new Rx.TestScheduler();
//Create our known message that will be published at known times (all times in milliseconds).
var messages = scheduler.createColdObservable(
onNext(0500, 'one'),
onNext(1000, 'two'),
onNext(2000, 'three'),
onNext(3500, 'four'),
onNext(4000, 'five')
);
//Replay all messages, and connect the reply decorator.
var replay = messages.replay();
var connection = replay.connect();
//Create 3 observers to subscribe/unsubscribe at various times.
var observerA = scheduler.createObserver();
var observerB = scheduler.createObserver();
var observerC = scheduler.createObserver();
//Subscribe immediately
var subA = replay.subscribe(observerA);
//Subscribe late, missing 1 message
var subB = Rx.Disposable.empty;
scheduler.scheduleAbsolute(null, 0800, function(){subB = replay.subscribe(observerB);});
//Subscribe late, and dispose before any live message happen
var subC = Rx.Disposable.empty;
scheduler.scheduleAbsolute(null, 1100, function(){subC = replay.subscribe(observerC);});
scheduler.scheduleAbsolute(null, 1200, function(){subC.dispose();});
//Dispose early
scheduler.scheduleAbsolute(null, 3000, function(){subB.dispose();});
//Start virutal time. Run through all the scheduled work (publishing messages, subscribing and unsubscribing)
scheduler.start();
//Assert our assumptions.
test.deepEqual(observerA.messages, [
onNext(0500, 'one'),
onNext(1000, 'two'),
onNext(2000, 'three'),
onNext(3500, 'four'),
onNext(4000, 'five')
],
"ObserverA should receive all values");
test.deepEqual(observerB.messages, [
onNext(0800, 'one'),
onNext(1000, 'two'),
onNext(2000, 'three'),
],
"ObserverB should receive initial value on subscription, and then two live values");
test.deepEqual(observerC.messages, [
onNext(1100, 'one'),
onNext(1100, 'two'),
],
"ObserverC should only receive initial values on subscription");
test.done();
};