Edit
I think the implementation of this question is reasonably complex, and have not been able to figure it out for 3 months now. I have re-phrased it in another question here: RxJS5 - How can I cache the last value of a aggregate stream, without using Subjects and whilst excluding those that have completed? , which hopefully summarizes what I am after a little more concisely. Thank you everyone for your help!
Original
I have now learnt how to deal with state in RxJS by mapping a series of partially applied state operation functions onto my state via scan. However, I would now like to take this a step further and connect streams declaratively between 'producers' and 'consumers' over a dataplane (pubsub layer), without piping them through a Subject.
There are many tutorials online covering this, but they all just basically use publish
imperatively as Subject.next in a middle layer for each stream name or channel. This is my current implementation, but it requires creating a ReplaySubject(1) (for caching of values for late arriving consumers) for each stream that never ends, as any consumers getting the stream receive a reference that would be then invalid if Subjects were removed when there were no producers currently streaming to a channel of that name.
I want to connect streams directly and have consumers taking in streams that are the aggregate of all active published streams of a particular name. This still requires a Subject to pipe in the initial registering producer streams (an incoming stream of producer streams in the format {name, stream}
).
I want to then merge all same-named streams into a single one that each registering consumer receives as a reference (think a filter service receiving a reference to filter.add
, which is a merge of all active producers creating filters) - but have this stream re-merge, in a reactive manner and with the consumer's link to that stream still valid, if a new producer with the same name also registers. Any late arriving consumer should also receive the last cached value of that aggregate stream.
In this way each aggregate stream needs to be dynamically re-evaluated each time a new stream is exposed on the pubsub layer, so wrapping a stream in a 'getActive' function (like here) doesn't work as this is imperative and only happens once when the stream is first fetched, rather than lazily re-evaluated for all consumers every time a new stream is published.
The result should be a stream that:
- Never completes;
- aggregates the results of all active streams of a certain name and discards those that are no longer active;
- lazily updates all consumers who have received a reference to that stream, such that if a new stream by the same name is published, the consumer's reference remains valid and is re-evaluated to merge in that new stream;
- caches the last merged result, such that new consumers will receive the last emitted value on subscription.
Basically, I need the 'trimToActiveOnly' function.
function getStream(all$, name) {
return all$
.filter(x => x.name === name)
.map(x => x.stream)
.map(trimToActiveOnly) // in this way, should be dynamically re-evaluated for all
// consumers every time new stream is published or stream ends,
// not just run once when the stream is first 'got'
.flatMapLatest(x => Rx.Observable.merge(x)) // where x is all currently active streams for a particular name, with finished/errored ones discarded
.publish().refCount(); //this is re-evaluated when a new stream is published or when one of the aggregated streams concludes. So the aggregate stream itself never concludes but may go cold if nothing is subscribed.
}
// desired behavior as followed
const publishStream$ = new Rx.Subject();
const foo$ = getStream(publishStream$, 'foo');
const bar$ = getStream(publishStream$, 'bar');
const fooSourceA$ = new Rx.Subject();
const fooSourceB$ = new Rx.Subject();
const barSourceA$ = new Rx.Subject();
const barSourceB$ = new Rx.Subject();
publishStream$.onNext({ name: 'foo', stream: fooSourceA$ });
publishStream$.onNext({ name: 'foo', stream: fooSourceB$ });
publishStream$.onNext({ name: 'bar', stream: barSourceA$ });
fooSourceA$.onNext('hello');
fooSourceA$.onNext('world');
barSourceA$.onNext('testing');
const fooSub = foo$.subscribe(x => console.log('foo: ' + x)); // should receive cached 'world'
const barSub = bar$.subscribe(x => console.log('bar: ' + x)); // should receive cached 'testing'
publishStream$.onNext({ name: 'bar', stream: barSourceB$ });
barSourceB$.onNext('123'); // barSub should now receive '123' as the aggregated active streams are dynamically re-evaluated on each publish of a new stream!
I also have a JSBin of this here.