2

This RxJava buffer example (with marble chart!) describes the desired result perfectly:

collect items in buffers during the bursty periods and emit them at the end of each burst, by using the debounce operator to emit a buffer closing indicator to the buffer operator

Edit: having reviewed How to create a RxJS buffer that groups elements in NodeJS but that does not rely on forever running interval?, my issue appears related to using a Subject as opposed to straight Observable.

Using the socket stream to generate window close events (as follows) results in 2 sockets opened and no events streaming out:

ws = Rx.DOM.fromWebSocket(wsURI, null, wsOpenObserver, wsCloseObserver);
var closer = ws.flatMapFirst(Rx.Observable.timer(250));
ws.buffer(closer)
    .subscribe(function(e) { console.log(e, 'socket messages');});
Community
  • 1
  • 1
mygzi
  • 1,303
  • 1
  • 11
  • 21
  • What is the question? – user3743222 Nov 03 '15 at 09:52
  • Let's start with 'how to reference the source observable itself in the bufferClosingSelector.' I find no examples on this subject. – mygzi Nov 03 '15 at 16:04
  • http://stackoverflow.com/questions/33402737/how-to-create-a-rxjs-buffer-that-groups-elements-in-nodejs-but-that-does-not-rel – user3743222 Nov 03 '15 at 16:15
  • That was helpful - my issue turns out to be one layer deeper. See edits. – mygzi Nov 03 '15 at 19:05
  • For this type of problem, an example with jsfiddle or jsbin would be really useful. – user3743222 Nov 03 '15 at 23:01
  • 1
    that should allow to discriminate what is happening. My intuition tells me there is a hot vs. cold issue here, so try to add `.share()`to `ws`. Hopefully only one socket is created then. Then tap ws to see what is emitted, i.e. something like `wsRx.DOM.fromWebSocket(wsURI, null, wsOpenObserver, wsCloseObserver).tap(*log_fn*).share()`. – user3743222 Nov 04 '15 at 10:47
  • Yes! .share()'ing the ws Subject resolves the issue and I can now buffer based on the debounced input stream itself. If you'd like to refile as an answer I will accept. – mygzi Nov 04 '15 at 18:53

2 Answers2

2

Summarizing the findings issue here :

  • Rx.DOM.fromWebSocket returns a Rx.subject which wraps around the websocket. That subject is made from one observer and one observable (via new Rx.Subject(observer, observable). From what I understood, that observer allows to write to the socket via its onNext method, while the observable allows to read from the socket.
  • you always read that subjects are hot sources, but apparently here that only means that the observer will immediately push its value to the subject which here pushes it to the socket. In normal cases(new Rx.Subject()), the default observer and observable are so that the observable listens to the observer, hence the default observable is hot. Here however, the observable is a cold source and then any subscription will reexecute the callback creating another websocket. Hence the creation of two sockets.
  • this does not happen for instance with Rx.dom.fromEvent because the created (cold) observable is shared (via publish().refCount()).
  • thus by doing the same here, the duplication issue can be solved. That means in this particular case, use in your code ws = Rx.DOM.fromWebSocket(wsURI, null, wsOpenObserver, wsCloseObserver).share();, share being an alias for publish().refCount().
  • I have to wonder whether that behaviour of Rx.DOM.fromWebSocket should be reported as a bug

Code for both methods:

user3743222
  • 18,345
  • 5
  • 69
  • 75
  • Can you include the actual workaround here, e.g. source = Rx.DOM.fromWebSocket(wsURI, null, wsOpenObserver, wsCloseObserver).share() – mygzi Nov 05 '15 at 23:44
0

You can pass an Observable directly to the buffer operator just like the RxJava version:

source.buffer(source.debounce(150))

is valid. See here.

The alternative syntax using the selector method that you show will invoke that method every time a buffer closes and then subscribe to the Observable it produces.

Also the debounce in the RxJava example is emitting the result of the buffer operator, it does not emit accumulated results by default.

paulpdaniels
  • 18,395
  • 2
  • 51
  • 55