4

I'm capturing events from an application using Rx.Observable.fromEvent in a NodeJS. These are sent to another server using request (https://www.npmjs.com/package/request). To avoid a high network load I need to buffer those events at a given timeout between sent requests.

Problem

Using bufferWithTime(200) will keep the node process running and I can't know when the application has finished to close the stream.

Is there any way to use Rx buffers to say:

  1. When Element 1 is pushed set a timer
  2. When Element 2 and 3 arrive before the timer expires push them to an array [1, 2, 3] (the buffer)
  3. When the timer expires, send the [1, 2, 3] array down the pipe.
  4. If Element 4 came after the timer expires then set a new timer and start all over again.

If no element is pushed then no timer is started which would make the process exit.

My initial approach was:

Rx.Observable
     .fromEvent(eventEmitter, 'log')
     .bufferWithTime(200) // this is the issue
     .map(addEventsToRequestOption)
     .map(request)
     .flatMap(Promise.resolve)
     .subscribe(log('Response received'))
royhowie
  • 11,075
  • 14
  • 50
  • 67
  • Maybe a combination of `buffer` and `timeout`? Cf. http://stackoverflow.com/questions/33306961/rxjs-distinguish-single-click-from-drag, i.e. put your `timeout` at the end with the amount of time after which you want to close the stream if no event came into the buffer. – user3743222 Oct 29 '15 at 02:32
  • With reference to [the documentation](https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/observable.md), your method chain switches, after `.fromEvent()` from *Observable method* to *Observable Instance methods*. Are you sure that's allowed? – Roamer-1888 Oct 29 '15 at 03:45
  • @user3743222 I might try that but if I don't know when I'll receive events closing the stream prematurely might mean missing some late events. A closed stream is reopened by a new event being pushed? – Constantin Dumitrescu Oct 29 '15 at 11:45
  • @Roamer-1888 Well, I assumed that I need to subscribe to an Observable instance and filter the events from that. Is it something I'm missing from how I should be using Rx? – Constantin Dumitrescu Oct 29 '15 at 11:46
  • @ConstantinDumitrescu, TBH, I don't really know Rx but looking at the documentation again, it would appear that method chains are of the general form `Rx.Observable.A(...).B1().B2().B3()....Bn();` where A is an "Observable Method" (which returns an instance) and B1, B2 etc are "Observable Instance Methods". The documentation doesn't really explain that properly but it can be inferred from the examples. Therefore the general form of your code would appear to be OK. Sorry to have cast doubt. – Roamer-1888 Oct 29 '15 at 13:57
  • @Constantin Dumitrescu: then I don't understand that part : `If no element is pushed then no timer is started which would make the process exit.` That contradicts your point 4. How do you know no element will be pushed any longer? In any case, the point 1-4 should be implementable I think through the version of the `buffer` operator with closing selector. That closing selector could be your event source/observable delayed by the timer that you mentioned. – user3743222 Oct 29 '15 at 15:22
  • 1
    @Roamer-1888 yup that's also what I understood, you "instantiate an observable" then "add actions to the stream" and finally call Bn() to capture the result. I'm also learning this so thanks for explaining it like that, I got another insight on how things work! – Constantin Dumitrescu Oct 31 '15 at 22:53
  • @user3743222 The problem I had was that the buffer kept starting timers regardless if elements were pushed in the stream. This caused the node process to hang. I needed something that buffers only when it starts receiving things, in other words - capture everything that you can but don't have any expectations :) The answer I accepted does exactly this. Also, sorry I wasn't explicit enough, thank you very much for giving a heads up with linking buffers and timeouts! It was on the line of what I eventually ended up doing. – Constantin Dumitrescu Oct 31 '15 at 22:57

2 Answers2

3

A proposed implementation, using the delay operator :

function emits(who){
  return function (x) { console.log([who, "emits"].join(" ") + " " + x + " click(s)");};
}

var source = Rx.Observable.fromEvent(document.body, 'click');
console.log("running");

var delayedSource$ = source.delay(1200);

var buffered$ = source
     .buffer(function () { return  delayedSource$;}).map(function(clickBuffer){return clickBuffer.length;})

buffered$.subscribe(emits("buffer"));

jsbin here : http://jsbin.com/wilurivehu/edit?html,js,console,output

user3743222
  • 18,345
  • 5
  • 69
  • 75
1

You'll probably need to split the stream and use the second part to trigger the first.

var source = Rx.Observable.fromEvent(eventEmitter, 'log');
var closer = source.flatMapFirst(Rx.Observable.timer(2000));

source
     .buffer(closer)
     .map(addEventsToRequestOption)
     .flatMap(function(x) { Promise.resolve(request(x)); })
     //I assume this log method returns a function?
     .subscribe(log('Response received'));

source.flatMapFirst(Rx.Observable.timer(2000)) is the important line here. It creates an Observable that generates a timer that will trigger after 2000 ms. When the first event comes in it will kick off the timer. flatMapFirst will ignore subsequent events as long as the timer is running. When the timer finally emits it will trigger the buffer to emit its current buffer and start again.

See docs on buffer with a boundary Observable

paulpdaniels
  • 18,395
  • 2
  • 51
  • 55
  • Thumbs up for the flatMapFirst! Very useful to know, thanks! The implementation works but unfortunately I need to capture also the events in between the time intervals while flatMapFirst will drop them. – Constantin Dumitrescu Oct 31 '15 at 22:49