0

I need to create a sequence of messages.

  • I should be able to "push" a message in sequence
  • I should be able to "query" all the messages
  • I should be able to notify all the listeners on a new message.

Now I finished with:

var math = require('mathjs')
var Rx = require('rx')
var _ = require('lodash')

var messagesSubject = new Rx.Subject()
var messagesPool = messagesSubject.map(function() { return [el]}).scan([], _.union)


Rx.Observable
  .interval(500 /* ms */)
  .timeInterval()

  .filter(
    function() { return math.randomInt(10) > 8;}
  )
  .do(function(x) {
    messagesSubject.subscribe(function(msg) {
      console.log('subscriber ' + x.value + ' do something with ' + msg.text)
    })
  }).subscribe()

Rx.Observable
  .interval(500 /* ms */)
  .timeInterval()
  .filter(
    function() { return math.randomInt(10) > 2;}
  )
  .map(function() {
    return { text: math.pickRandom(['one', 'two', 'three'])}
  }).subscribe(messagesSubject)

How can I notify every new subscriber with all the previous message(messagesPool)?

Side questions:Is it valid use case of subject? Or should I choose another type of subject?

kharandziuk
  • 12,020
  • 17
  • 63
  • 121
  • 1
    sounds fine to me. I suppose you push messages with `messagesSubject.onNext` or ``messagesSubject.next`(for Rxjs V5) right? Otherwise you can review some other ways to use subjects here : http://stackoverflow.com/questions/34849873/what-are-the-semantics-of-different-rxjs-subjects – user3743222 Mar 22 '16 at 21:45
  • onNext or just 'pipe'(but I don't know is possible on the moment) – kharandziuk Mar 22 '16 at 21:47
  • I dont know about `pipe`. But `next` is part of the subject API for sure. You just have to be careful to use the right API for your rxjs version as there has been some changes in rxjs v5 – user3743222 Mar 22 '16 at 21:50
  • @user3743222 I described my question better(I hope). Can you check? Under 'pipe` I mean `.subscribe(messagesSubject)`. It's much easier for me to think about piping in this case – kharandziuk Mar 23 '16 at 12:13
  • 1
    if you want to notify new subscribers of previous emissions, you need a `Rx.ReplaySubject`. You can specify there how many of the previous emissions you want the new subscriber to have. Or don't put anything if you want to replay the whole list of emissions from the beginning. However, note that this works internally using a buffer, so make sure you are not keeping too much memory busy. – user3743222 Mar 23 '16 at 12:18
  • `messagesPool` is intended to be a kind of buffer. – kharandziuk Mar 23 '16 at 12:28

3 Answers3

2

Sounds like you're looking for ReplaySubject rather than Subject.

[ReplaySubject is a] Subject that buffers all items it observes and replays them to any Observer that subscribes.

Richard Szalay
  • 83,269
  • 19
  • 178
  • 237
1

As others have pointed out, ReplaySubject can be your friend here.

This may mean that you can remove your message pool feature.

You can also get rid of the subject entirely if you just compose your queries:

var math = require('mathjs')
var Rx = require('rx')
var _ = require('lodash')

var messages = Rx.Observable
  .interval(500 /* ms */)
  .timeInterval()
  .filter(
    function() { return math.randomInt(10) > 2;}
  )
  .map(function() {
    return { text: math.pickRandom(['one', 'two', 'three'])}
  })
  .replay();

//Randomly add subscribers (but this would only be dummy code, not suitable for prod)
var randomSubsriberAdder = Rx.Observable
  .interval(500 /* ms */)
  .timeInterval()
  .filter(
    function() { return math.randomInt(10) > 8;}
  )
  .subscribe(function(x) {
    messages.subscribe(function(msg) {
      console.log('subscriber ' + x.value + ' do something with ' + msg.text);

var connection = messages.Connect();
//messages will now be collecting all values.
//  Late subscribers will get all previous values.
//  As new values are published, existing subscribers will get the new value.

You may be better off using hard coded sets of data and the Rx Testing tools/libs. This way you will have control over which edge cases you are testing (early subscriber, late subscriber, disconnecting subscriber, silence on the stream etc..)

Lee Campbell
  • 10,631
  • 1
  • 34
  • 29
  • hey nice answer. But the first sequence emulates adding new subscribers to messagePool. Do you have an idea how can I rewrite this without `do` statement? – kharandziuk Mar 29 '16 at 07:02
  • Oh! I see now. Personally I would remove all the randomness, and use the testing tools that Rx comes with. Is there a reason that the `function` in the `Do` cant just be moved to the `Subscribe`? – Lee Campbell Mar 29 '16 at 07:11
  • I just trying to emulate how can some message pull work(it's just a study sample!). There are some subscribers which appears randomly(now I don't now how to make them disconnect sometimes) an some of them send messages to the pool. each new subscriber receive all the messages. Do you understand the idea? – kharandziuk Mar 29 '16 at 07:17
  • Updated answer (almost a rewrite) – Lee Campbell Mar 29 '16 at 07:18
  • hey, thanks a lot for your answer. I see no reason to not accept it. but If you have a time can I ask to give me an idea how can "remove" some subscribers randomly? – kharandziuk Mar 29 '16 at 07:21
  • added that as another full answer below. – Lee Campbell Mar 29 '16 at 08:22
1

An example of the code without the use of subjects, unit tested with Replay semantics and transient subscribers. Runs on node with node-unit

(windows cmds)

npm install rx
npm install node-unit
.\node_modules\.bin\nodeunit.cmd tests

and the code in the test directory.

var Rx = require('rx')

var onNext = Rx.ReactiveTest.onNext,
    onError = Rx.ReactiveTest.onError,
    onCompleted = Rx.ReactiveTest.onCompleted,
    subscribe = Rx.ReactiveTest.subscribe;

exports.testingReplayWithTransientSubscribers = function(test){
    //Declare that we expect to have 3 asserts enforced.
    test.expect(3);

    //Control time with a test scheduler
    var scheduler = new Rx.TestScheduler();
    //Create our known message that will be published at known times (all times in milliseconds).
    var messages = scheduler.createColdObservable(
        onNext(0500, 'one'),
        onNext(1000, 'two'),
        onNext(2000, 'three'),
        onNext(3500, 'four'),
        onNext(4000, 'five')
    );

    //Replay all messages, and connect the reply decorator.
    var replay = messages.replay();
    var connection = replay.connect();

    //Create 3 observers to subscribe/unsubscribe at various times.
    var observerA = scheduler.createObserver();
    var observerB = scheduler.createObserver();
    var observerC = scheduler.createObserver();

    //Subscribe immediately
    var subA = replay.subscribe(observerA);
    //Subscribe late, missing 1 message
    var subB = Rx.Disposable.empty;
    scheduler.scheduleAbsolute(null, 0800, function(){subB = replay.subscribe(observerB);});
    //Subscribe late, and dispose before any live message happen
    var subC = Rx.Disposable.empty;
    scheduler.scheduleAbsolute(null, 1100, function(){subC = replay.subscribe(observerC);});
    scheduler.scheduleAbsolute(null, 1200, function(){subC.dispose();});
    //Dispose early 
    scheduler.scheduleAbsolute(null, 3000, function(){subB.dispose();});


    //Start virutal time. Run through all the scheduled work (publishing messages, subscribing and unsubscribing)
    scheduler.start();

    //Assert our assumptions.
    test.deepEqual(observerA.messages, [
            onNext(0500, 'one'),
            onNext(1000, 'two'),
            onNext(2000, 'three'),
            onNext(3500, 'four'),
            onNext(4000, 'five')
        ], 
        "ObserverA should receive all values");
    test.deepEqual(observerB.messages, [
            onNext(0800, 'one'),
            onNext(1000, 'two'),
            onNext(2000, 'three'),
        ], 
        "ObserverB should receive initial value on subscription, and then two live values");
    test.deepEqual(observerC.messages, [
            onNext(1100, 'one'),
            onNext(1100, 'two'),
        ], 
        "ObserverC should only receive initial values on subscription");
    test.done();
};
Lee Campbell
  • 10,631
  • 1
  • 34
  • 29