2

I'm very impressed about RxJS and actually start working on that. However, my following nodejs code does not work as expected at least for me.

let events = new EventEmitter();
let source = Rx.Observable.fromEvent( events, 'data' );

source
    .groupBy( event => event.type )
    .flatMap( group => group.reduce( ( acc, cur ) => _.merge( acc, cur ), [] ) )
    .subscribe( ( data ) => {
        console.log( data );
    } );


events.emit( 'data', { 'type': 1, msg: 'Test 1' } );
events.emit( 'data', { 'type': 1, msg: 'Test 2' } );
events.emit( 'data', { 'type': 2, msg: 'Test 3' } );

I expect that subscribe produces some output

sfrehse
  • 1,062
  • 9
  • 22
  • What output should this produce? – martin Oct 26 '16 at 09:36
  • It actually produces nothing. I expect an array of array that are grouped according `groupBy`. `[ [ { 'type': 1, msg: 'Test 1' }, { 'type': 1, msg: 'Test 2' ], [ { 'type': 2, msg: 'Test 3' } ] }` . – sfrehse Oct 26 '16 at 09:43
  • Where do you get EventEmitter from? Is it a part of the starndard rxjs library or is it ng2 class? Is there a chance you can put this in a running jsbin? – Meir Oct 26 '16 at 09:46
  • 1
    @Meir That's a standard node.js class https://nodejs.org/api/events.html#events_class_eventemitter – martin Oct 26 '16 at 09:48
  • I will prepare a online running version. – sfrehse Oct 26 '16 at 09:49
  • Thx! Will make it easier to look into. Meanwhile, if you subscribe directly to 'events' without a groupBy, does it work? – Meir Oct 26 '16 at 09:55
  • @Meir yes without `groupBy` it works. – sfrehse Oct 26 '16 at 09:58
  • I guess http://stackoverflow.com/questions/33402737/how-to-create-a-rxjs-buffer-that-groups-elements-in-nodejs-but-that-does-not-rel?rq=1 is similar – sfrehse Oct 26 '16 at 10:10
  • Is it possible restart sending data after `onCompleted`? – sfrehse Oct 26 '16 at 13:40

3 Answers3

5

It's a slightly changed version I've created in a JSBin http://jsbin.com/cufana/edit?js,console.

The problem I'm seeing in your code is that you have an observable that does not complete. If you are performing a groupBy, the results that are being kept internally (being the grouped results) will not be pushed through as long as the source observable does not complete.

let events = new Rx.Subject();

events
    .groupBy( event => event.type)
    .flatMap(group => group.reduce((acc, curr) => [...acc, curr], []))
    .subscribe( ( data ) => {
        console.log( data );
    } );


events.next({ 'type': 1, msg: 'Test 1' } );
events.next({ 'type': 1, msg: 'Test 2' } );
events.next({ 'type': 2, msg: 'Test 3' } );
events.complete();

Here you can see I've changed the eventemitter to a subject to get a working jsbin without angular2 dependency. I'm acually completing the subject so my source observable from the groupBy get's completed. This will push the results through.

The rest of the code was pretty correct.

If the EventEmitter is indeed from Angular2, you will have problems I guess with completing this one. Can this be done from the child component?

KwintenP
  • 4,637
  • 2
  • 22
  • 30
2

The problem is that your sequence is not terminated. groupBy triggers only on termination. See jsbin example with both cases.

When you do Rx.Observable.from(...) you get a completed sequence, otherwise, you need to terminate manually. The example on groupBy documentation demonstrate this.

Meir
  • 14,081
  • 4
  • 39
  • 47
  • Does help some buffering before groupBy? – sfrehse Oct 26 '16 at 10:08
  • Nope :-) Because groupBy triggers only when the sequence's state is completed. Buffering does not complete it. I think this is quite consistent with the groupBy purpose. There is not point in grouping data on a live sequence, only once all data had been pushed through (hence it is completed), you can do the grouping and emit a result – Meir Oct 26 '16 at 10:10
2

As suggested by others the Observable never completes so any operator that requires the preceding Observable to complete will never emit anything:

The groupBy() operator however emits an instance of GroupedObservable for each group so you can subscribe to it. I know this produces a different result than you'd expected but maybe you can work with it:

const Rx = require('rxjs/Rx');
const EventEmitter = require('events');

let events = new EventEmitter();
let source = Rx.Observable.fromEvent(events, 'data');

source
    .groupBy( event => event.type )
    .subscribe( ( groupedObservable ) => {
        groupedObservable.subscribe(val => {
            console.log(groupedObservable.key, val);
        });
    } );


events.emit( 'data', { 'type': 1, msg: 'Test 1' } );
events.emit( 'data', { 'type': 1, msg: 'Test 2' } );
events.emit( 'data', { 'type': 5, msg: 'Test 3' } );

Each group has it's own GroupedObservable.

This prints to console:

1 { type: 1, msg: 'Test 1' }
1 { type: 1, msg: 'Test 2' }
5 { type: 5, msg: 'Test 3' }
martin
  • 93,354
  • 25
  • 191
  • 226