1

I have 2 services in my application. A youTube service that loads youTube comment threads and comments over the wire and a comment service that manages the loading of batches of comments. The comments service is specific to my app, the youTube service is app agnostic.

I have a function getCommentThreadsForChannel that loads comment threads. The main implementation for this is in the youTube service but this is called on the comment service which basically just calls returns the observable from the youTube service.

As far as my controller that calls this is concerned this is just an observable sequence of comment threads. However, in my commentService I want to store these threads locally. I want to batch this into storing all the threads whenever I get 100 more threads so that I am not processing the list for every single new bit of data. I have come up with this code:

    getCommentThreadsForChannel(): Rx.Observable<ICommentThread> {
        var threadStream: Rx.Observable<ICommentThread> =
            this.youTubeService.getCommentThreadsForChannel();

        threadStream
            .bufferWithCount(100)
            .scan( ( allItems, currentItem ) => {
                currentItem.forEach(thread => {
                    allItems.push(thread);
                });

                console.log( `Save items to local storage: ${allItems.length}` )

                return allItems;
            }, []  );

        return threadStream;
    }

I think that the logic here for batching the threads and for accumulating all of the threads into one array is fine but this code is never called. I presume that this is because I am not subscribing to this thread at all.

I do not want to subscribe in here as that would subscribe to the underlying stream and I would then have 2 subscriptions and all the data would load twice (there is a LOT of data - it takes about a minute to load it all and over 30 calls of 100 threads at a time).

I basically want a do here that will not affect the stream that is passed to the controller but I want to use the buffering and accumulating logic of RXjs.

I presume that I need to share or publish the stream in some way but I have little success using these operators before and can't see how I would be able to do it without adding a second subscribe.

How do I share one stream and use it in 2 different ways without subscribing twice? Can I create some sort of passive stream that only subscribes when the observable it is based on subscribes?

Roaders
  • 4,373
  • 8
  • 50
  • 71

2 Answers2

1

I figured this out in the end (with some help from @MonkeyMagiic).

I had to share the stream so that I could do 2 different things with the data, buffer it AND at on each value individually. The problem was that both of these streams had to be subscribed to but I didn't want to subscribe in the service - that should be done in the controller.

The solution was to combine the 2 streams again and ignore values from the buffer:

var intervalStream = Rx.Observable.interval(250)
    .take(8)
    .do( function(value){console.log( "source: " + value );} )
    .shareReplay(1);

var bufferStream = intervalStream.bufferWithCount(3)
    .do( function(values){
      console.log( "do something with buffered values: " + values );
    } )
    .flatMap( function(values){ return Rx.Observable.empty(); } );

var mergeStream = intervalStream.merge( bufferStream );

mergeStream.subscribe(
  function( value ){ console.log( "value received by controller: " + value ); },
  function( error ){ console.log( "error: " + error ); },
  function(){ console.log( "onComplete" ); }
);

Output:

"source: 0"
"value received by controller: 0"
"source: 1"
"value received by controller: 1"
"source: 2"
"value received by controller: 2"
"do something with buffered values: 0,1,2"
"source: 3"
"value received by controller: 3"
"source: 4"
"value received by controller: 4"
"source: 5"
"value received by controller: 5"
"do something with buffered values: 3,4,5"
"source: 6"
"value received by controller: 6"
"source: 7"
"value received by controller: 7"
"do something with buffered values: 6,7"
"onComplete"

JSBin

Roaders
  • 4,373
  • 8
  • 50
  • 71
0

I believe what you are looking for is a combination of share and replay, thankfully RX has the shareReplay(bufferSize).

var threadStream: Rx.Observable<ICommentThread> = this.youTubeService
                .getCommentThreadsForChannel()
                .shareReplay(1); 

getCommentThreadsForChannel(): Rx.Observable<ICommentThread> {
        threadStream
            .bufferWithCount(100)
            .scan( ( allItems, currentItem ) => {
                currentItem.forEach(thread => {
                    allItems.push(thread);
                });

                console.log( `Save items to local storage: ${allItems.length}` )

                return allItems;
            }, []  );

        return threadStream;
    }

Using the multicast operator share will ensure that that each additional subscription after the first does not make the unnecessary request and the replay is to ensure all future subscriptions receive the last notification.

MonkeyMagiic
  • 126
  • 1
  • 7
  • It's worth noting that `shareReplay`, as it stands, has been removed from the latest RxJS 5 beta. [More info here](http://stackoverflow.com/questions/35246873/sharereplay-in-rxjs-5). – Philip Bulley Feb 14 '16 at 12:29
  • Thanks for the reply but it's not working I'm afraid. I think that the problem is that the thread stream is being subscribed to when it is returned from that function but the bufferWithCount and scan stream is not being subscribed to. As I mentioned before I do not want to subscribe here, I only want this to work when the thread stream is subscribed to. – Roaders Feb 14 '16 at 14:22
  • I am getting a little lost (sorry), the entire chain of observables will be subscribed to as a unit, that is {observable1}.{observable2}.{observable3} will send down the chain from the top Notification --> observable1 through to observable2. – MonkeyMagiic Feb 14 '16 at 15:21
  • as an addition, I can guarantee you are subscribing to the bufferWithCount and the scan if you are subscribing to the threadStream. – MonkeyMagiic Feb 14 '16 at 15:24
  • using bufferWithCount(100) would indicate you are waiting for 100 notifications to be released off getCommentThreadsForChannel(), would that be the behaviour you wanted? can you please paste what that method looks like? – MonkeyMagiic Feb 14 '16 at 15:38