0
  • I want a pubsub system, with producers and consumers of streams, via a dataplane layer but without Subjects.
  • Many producers can multicast to the same stream name (e.g, 'filters.add'), and multiple consumers can subscribe to the stream.
  • Producers register with a name and stream to create a 'stream of published streams'.
  • If a new producer registers with a particular stream name, the streams flowing to the consumers are dynamically updated such that the data received is a merging of all currently active published streams with the name requested.

  • One producer stream completing should not cause the aggregate stream to complete and is instead excluded (with all consumers updated) when this occurs.

  • A consumer getting a stream after a producer registers should receive the last emitted value of that stream (if there has been one to date).

Here is the code I have so far and the tests I wish to satisfy. I can satisfy the base case but not caching, since:

  • completed streams are being excluded even though they may have emitted the most recent value;
  • .cache(1) after stream.publish().refCount() repeats entry 4 from the first Observable, instead of 8 from the second;
  • even with pluckActiveStreams commented out, which also doesn't work, I just can't get it to work with any of the approaches commented out...

My questions are:

  • how can I implement caching of the aggregated stream? None of the commented out final lines of relevantStreams$ accomplishes this.
  • I think I need to remove completed streams from the aggregate stream received, so that the aggregate stream itself is not (ever) completed. This is (potentially) accomplished with pluckActiveStreams, which is based on the solution given here but still give late-arriving consumers the last emitted value of the stream, even if it may have been from a now-completed stream?

My current implementation already successfully accomplishes this using observables of ReplaySubjects internally as a 'bridge' between producers and consumers. But this way:

  1. all producer streams are automatically made hot;
  2. the ReplaySubject needs a hack to ensure it never completes if any of the streams multicasting to it do (otherwise it shuts down the stream for all consumers, as explained here);
  3. Once created upon registration of the first producer, a ReplaySubject representing a stream name for a particular group of producers can never again be made cold, disposed if there are no current producers for that name, or re-instantiated with different arguments, as any previously-registered consumers of that stream would then hold an incorrect reference. Consumers instead therefore need a reference to the reactively updated, declarative aggregate of active streams for a particular name.

Code

const publishStream$ = new Subject();

function publishStream(name, stream) {
    // publish streams as connectables to ensure a single source of truth
    // (i.e., don't duplicate server requests)
    publishStream$.next({ name, stream: stream.publish().refCount() });
}

const availableStreams$ = publishStream$
    .startWith([])
    .scan((streams, stream) => {
        console.log('\nadding stream:', stream.name)
        // (this would eventually be state operations to add/remove streams
        // so this doesn't just keep growing)
        return streams.concat(stream)
    })
    .publish();
availableStreams$.connect()

function getStream(name) {

    const relevantStreams$ = availableStreams$
        .do(res => console.log(`\nstream recalculates here for ${name}`))
        .map(streams => streams.filter(streamObj => streamObj.name === name))
        .filter(streams => streams.length)
        .map(streamObjs => streamObjs.map(streamObj => streamObj.stream))
        .map(pluckActiveStreams)
        .switchMap(streams => Observable.merge(...streams))
        // add caching for late subscribers 
        // per https://github.com/ReactiveX/rxjs/issues/1541?
        // (none of the approaches below work):
        //.multicast(() => new ReplaySubject()).refCount()
        // .publishReplay(1).refCount()
        //.share().repeat(1).publish()
       //.cache(1)

    return relevantStreams$;

}


function pluckActiveStreams(streams) {
    const activeStreams = new Subject();
    const elements = [];
    const subscriptions = [];

    streams.forEach(stream => {
        var include = true;
        const subscription = stream.subscribe(x => {
            console.log('value', x)
        }, x => {
            console.log('error:', x)
        }, x => {
            console.log('completed', x)
            include = false;
            const i = elements.indexOf(stream);
            if (i > -1) {
                elements.splice(i, 1);
                activeStreams.next(elements.slice());
            }
        });
        if (include) {
            elements.push(stream);
            subscriptions.push(subscription);
        }
    });

    activeStreams.next(elements.slice());

    const pluckedStreams = Observable.using(
        () => new Subscription(() => subscriptions.forEach(x => x.unsubscribe())),
        () => activeStreams
    );
    return pluckedStreams;
}

Tests

        var acc1 = [];
        var acc2 = [];

        // all streams published get publish().refCount() internally
        var obs1 = Observable.of(1, 2, 3, 4);
        var obs2 = Observable.empty();
        var obs3 = Observable.of(5, 6, 7, 8);

        // consumer before publish - will receive values
        var sub1 = dp.getStream('foo').subscribe(function (i) {
            console.log('subscription receives: ', i)
            acc1.push(i);
        });

        var pub1 = dp.publishStream('foo', obs1);
        console.log('end of publishStream1')
        var pub2 = dp.publishStream('foo', obs2);
        console.log('end of publishStream2')
        var pub3 = dp.publishStream('foo', obs3);
        console.log('end of publishStream3')

        // consumer after publish - should receive last cached value
        // from active aggregated stream
        var sub3 = dp.getStream('foo').subscribe(function (i) {
            console.log("\ncached value received (I also don't fire :( ", i)
            acc2.push(i);
        });

        var i = setTimeout(function () {
            expect(acc1).to.deep.equal([1, 2, 3, 4, 5, 6, 7, 8]);
            expect(acc2).to.deep.equal([8]);
            done();
        }, 10);
Community
  • 1
  • 1
brokenalarms
  • 305
  • 3
  • 9
  • so what is the question? – user3743222 Jul 21 '16 at 11:29
  • I thought it was just "so how can I do this?", but I have tried to add a question more specifically and updated the title, thanks! – brokenalarms Jul 21 '16 at 19:49
  • what is the rationale for the `without subjects` requirement? – user3743222 Jul 21 '16 at 20:04
  • I've updated my answer to include this info, thank you. – brokenalarms Jul 21 '16 at 23:23
  • not sure I understand your explanations, as it is quite wordy, and in the end, in those cases a drawing, and some examples of input/outputs would help more than text, but : hot streams are still lazy i.e. you need to start them, you can add a hack similar to the one you showed to replay subjects so it does not complete when it should not, and if you want to have a hot/cold/hot/cold behaviour for an observable, check the `singleInstance` operator (Rxjs v4). In Rxjs v5, I am not sure what operator you would have to use for replay, but `share` in v5 has the behaviour of reconnection you mention – user3743222 Jul 22 '16 at 02:05
  • i.e being hot when subscribed to, cold when no subscribers, and hot again when new subscribers come. In short my recommendation is : use subjects. – user3743222 Jul 22 '16 at 02:06

0 Answers0