1

Edit

I think the implementation of this question is reasonably complex, and have not been able to figure it out for 3 months now. I have re-phrased it in another question here: RxJS5 - How can I cache the last value of a aggregate stream, without using Subjects and whilst excluding those that have completed? , which hopefully summarizes what I am after a little more concisely. Thank you everyone for your help!

Original

I have now learnt how to deal with state in RxJS by mapping a series of partially applied state operation functions onto my state via scan. However, I would now like to take this a step further and connect streams declaratively between 'producers' and 'consumers' over a dataplane (pubsub layer), without piping them through a Subject.

There are many tutorials online covering this, but they all just basically use publish imperatively as Subject.next in a middle layer for each stream name or channel. This is my current implementation, but it requires creating a ReplaySubject(1) (for caching of values for late arriving consumers) for each stream that never ends, as any consumers getting the stream receive a reference that would be then invalid if Subjects were removed when there were no producers currently streaming to a channel of that name.

I want to connect streams directly and have consumers taking in streams that are the aggregate of all active published streams of a particular name. This still requires a Subject to pipe in the initial registering producer streams (an incoming stream of producer streams in the format {name, stream}).

I want to then merge all same-named streams into a single one that each registering consumer receives as a reference (think a filter service receiving a reference to filter.add, which is a merge of all active producers creating filters) - but have this stream re-merge, in a reactive manner and with the consumer's link to that stream still valid, if a new producer with the same name also registers. Any late arriving consumer should also receive the last cached value of that aggregate stream.

In this way each aggregate stream needs to be dynamically re-evaluated each time a new stream is exposed on the pubsub layer, so wrapping a stream in a 'getActive' function (like here) doesn't work as this is imperative and only happens once when the stream is first fetched, rather than lazily re-evaluated for all consumers every time a new stream is published.

The result should be a stream that:

  • Never completes;
  • aggregates the results of all active streams of a certain name and discards those that are no longer active;
  • lazily updates all consumers who have received a reference to that stream, such that if a new stream by the same name is published, the consumer's reference remains valid and is re-evaluated to merge in that new stream;
  • caches the last merged result, such that new consumers will receive the last emitted value on subscription.

Basically, I need the 'trimToActiveOnly' function.

function getStream(all$, name) {
  return all$
  .filter(x => x.name === name)
    .map(x => x.stream)
    .map(trimToActiveOnly) // in this way, should be dynamically re-evaluated for all
                 // consumers every time new stream is published or stream ends,
                 // not just run once when the stream is first 'got'
    .flatMapLatest(x => Rx.Observable.merge(x)) // where x is all currently active streams for a particular name, with finished/errored ones discarded
    .publish().refCount(); //this is re-evaluated when a new stream is published or when one of the aggregated streams concludes. So the aggregate stream itself never concludes but may go cold if nothing is subscribed.
}

// desired behavior as followed
const publishStream$ = new Rx.Subject();

const foo$ = getStream(publishStream$, 'foo');
const bar$ = getStream(publishStream$, 'bar');

const fooSourceA$ = new Rx.Subject();
const fooSourceB$ = new Rx.Subject();
const barSourceA$ = new Rx.Subject();
const barSourceB$ = new Rx.Subject();
publishStream$.onNext({ name: 'foo', stream: fooSourceA$ });
publishStream$.onNext({ name: 'foo', stream: fooSourceB$ });
publishStream$.onNext({ name: 'bar', stream: barSourceA$ });
fooSourceA$.onNext('hello');
fooSourceA$.onNext('world');
barSourceA$.onNext('testing');

const fooSub = foo$.subscribe(x => console.log('foo: ' + x)); // should receive cached 'world'
const barSub = bar$.subscribe(x => console.log('bar: ' + x)); // should receive cached 'testing'

publishStream$.onNext({ name: 'bar', stream: barSourceB$ });
barSourceB$.onNext('123'); // barSub should now receive '123' as the aggregated active streams are dynamically re-evaluated on each publish of a new stream!

I also have a JSBin of this here.

Community
  • 1
  • 1
brokenalarms
  • 305
  • 3
  • 9

1 Answers1

1

It's not exactly the same, but this bears similarities to Rx: a zip-like operator that continues after one of the streams ended?

You've got an Observable<{name, <Observable>}>. You can apply filter to that to reduce it to only the substream that meets a certain criteria. You can then use map to exclude the name get to an Observable<Observable>, and from there you can use any reduction you like to combine the values (e.g. zip to match up by index, combineLatest to re-emit some kind of aggregate whenever a substream yields a value, or flatMapLatest to flatten into a single stream). I imagine you'll want to use something like the active transformation in the link, as you'll need to kick completed streams out as they will interfere with many of the combining methods.

For example, consider the following method:

function getStream(all$, name) {
  return active(all$.filter(x => x.name === name).map(x => x.stream))
    .flatMapLatest(x => Rx.Observable.merge(x));
}

From here, you could set things up like so:

const publishStream$ = new Rx.Subject();
const foo$ = getStream(publishStream$, 'foo');
const bar$ = getStream(publishStream$, 'bar');

You could then subscribe:

const fooSub = foo$.subscribe(x => console.log('foo: ' + x));
const barSub = bar$.subscribe(x => console.log('bar: ' + x));

And you could push data through (you wouldn't really use subjects, this is just for example):

const fooSourceA$ = new Rx.Subject();
const fooSourceB$ = new Rx.Subject();
const barSourceA$ = new Rx.Subject();
const barSourceB$ = new Rx.Subject();
publishStream$.onNext({ name: 'foo', stream: fooSourceA$ });
publishStream$.onNext({ name: 'foo', stream: fooSourceB$ });
publishStream$.onNext({ name: 'bar', stream: barSourceA$ });
publishStream$.onNext({ name: 'bar', stream: barSourceB$ });
fooSourceA$.onNext('hello');
fooSourceA$.onNext('world');
barSourceA$.onNext('testing');
barSourceB$.onNext('123');

If I've understood what you want correctly, this will do the job. To explain what's going on here, getStream takes a stream of objects that contain a name and a stream, filters to only those objects with the specified name, throws the name away and retains just the stream, and runs it through the active function from the linked answer, which will function like a scan reduction. The result will be a stream of arrays of streams, which we subsequently flatten into a single stream using flatMapLatest and merge. This means that all streams for the given name will be aggregated into a single stream, which can be subscribed to as desired.

Here's the whole thing working: https://jsbin.com/liwiga/2/edit?js,console,output

One caveat is using flatMapLatest and merge with the output of active like this will not work well if fed cold observables (as each time the array of active observables is emitted, subscriptions begin anew).

Community
  • 1
  • 1
Matt Burnell
  • 2,646
  • 1
  • 22
  • 23
  • Thanks for your comment. The answer you provided is very close (also your other answer is great), but this deals with keeping streams active when some have finished (which I also want!) but I can't get it to re-update when new producers arrive after consumers have already gotten a stream. I have edited and updated my code and would really appreciate some help. – brokenalarms Apr 15 '16 at 02:16
  • I've updated my answer with an example. I hope I've properly understood what you're after, and that the example helps. The example has the "consumers" registering before the "producers", and everything works ok. – Matt Burnell Apr 18 '16 at 13:12
  • I still get an Observable out of the end of the subscription function from this, rather than the actual data. I had to make some changes for my use of RxJS 5, so maybe it's my use of 'Subscriber' rather than 'Disposable'? That's what the migration docs say to use, and I can't see anything else different in our code (edited: added mine). – brokenalarms Apr 18 '16 at 17:43
  • Can you confirm the jsbin example is doing what you want / expect? If so, I'll translate everything to rxjs5 and see what goes wrong. – Matt Burnell Apr 18 '16 at 22:34
  • I've added JSBin [here](http://jsbin.com/hatogihoxe/1/edit?html,js,console) using RxJS5. I've had to remove the `activeStreams` function at present as I can't get `using` working - the API hasn't changed in RxJS5 so I don't know why that's happening. But I'd also want late arriving consumers to receive the last of the combined active producers. Currently I have it implemented with producers exposing a stream to the dataPlane, which then multicasts to a non-completing ReplaySubject, which the consumers then receive an observable of. This results in a call stack a mile deep... really not ideal. – brokenalarms Apr 19 '16 at 23:30
  • I have an updated gist [here](https://jsbin.com/zidurenowo/edit?js,console,output) for RxJS5, but I cannot get `.using` working - there appears to be no migration for this operator, yet it no longer works. I have also moved the `publishStream` operations to where I would want the all subscriptions to still receive the cached last value of them (as I do currently using ReplaySubject imperatively. Thanks! – brokenalarms Apr 28 '16 at 02:56
  • Looks like the version jsbin uses is missing `using`, though it appears in the source on github. It's not critical to a demo solution, as it's just for cleanup. Having said that, I took a quick look and there are a bunch of other conversion issues. I'll take another look at it when I get a chance – Matt Burnell Apr 28 '16 at 22:32
  • Hi Matt, several months later and I am still trying to implement this :/ I have re-phrased my question and include comments in this paste bin: https://jsbin.com/bogifilijo/1/edit?js,console,output . Hopefully this can help someone to answer it - the issue is that I can't have a stream 'wrapped' with an 'active' function as this is imperative and happens once only - I need active streams to be dynamically evaluated each time there is a new stream published. – brokenalarms Jul 20 '16 at 17:42