1

I send request - get an array of data. In order to manipulate that data I need to flatten it, so I can use it as a stream of entities not stream of array of entities but, then on side-effect I want those entities appear in the UI at once, not one by one, so it updates UI only ones at a time.

Let's say I have a code like this:

// this generates a sequence of objects - getCasesStream sends an ajax request,
// whenever dateRange changes 
casesStm = dateRangeStm.flatMapLatest(getCasesStream)

casesStm.subscribe((x)=> { console.log(x) })

function getCasesStream(dateRange) {
    return getCases(dateRange.startDate, dateRange.endDate)  
        // api every time returns an array,
        // but it's difficult to work with array of elements, ergo the flattening
        .flatMap((x) => x)                                
        .filter((x) => _.isNotEmpty(x.value))
        .map((caseDoc) => _.assign(caseDoc.value, {
            key: caseDoc.id
        }));
}

This works just fine, emits one value at a time. Now what I want is to emit at most 10 values, if there are less than 10 - emit whatever left.

I thought I could solve that by doing this:

casesStm
    .windowWithCount(10)
    .flatMap((x)=> x.toArray())

But then this only works if getCasesStream (after filtering) returns at least 10 items, if it has less than that - I won't even see them.

How can I buffer elements effectively here? again:

  • api sends us an array
  • to filter and add additional props to each element it's better to flatten that array (or maybe not?)
  • at the end I need to buffer (don't want to force browser to redraw each time new element comes)

Maybe I should use generic window that returns length of elements in getCasesStream but that function doesn't take any arguments, how can I get the length? I tried using windowWithTimeOrCount - that keeps emitting empty buffer every interval, even when there's no elements.

iLemming
  • 34,477
  • 60
  • 195
  • 309

1 Answers1

1

You could filter out those empty buffer and alternatively you could review some of the options mentioned in How to create a RxJS buffer that groups elements in NodeJS but that does not rely on forever running interval?.

The idea showcased there is to use the operator buffer with closing selector. As a closing selector, you could use merge(source.skip(9).take(1).repeat(), source.delay(Xms)) (or the operator flatMapFirst as proposed as well in the link thereaforementioned, review both options). In that way, in principle when there is no case emitted, there is no buffer emitted, and when a case arrive, the merge operator emits a value either on the 10th case, or Xms after, whichever is first. When the value from the merge operator is emitted, the buffer is closed and emitted.

You can take inspiration from the code here:

function emits(who){
  return function (x) { console.log([who, "emits"].join(" ") + " " + x + " click(s)");};
}

var Xms = 1700;

var source = Rx.Observable.fromEvent(document.body, 'click');
console.log("running");

var delayedSource$ = Rx.Observable.merge(source.skip(9).take(1).repeat(), source.delay(Xms));

var buffered$ = source
     .buffer(function () { return  delayedSource$;}).map(function(clickBuffer){return clickBuffer.length;});

buffered$.subscribe(emits("buffer"));

jsbin : http://jsbin.com/siqopuxoli/edit?html,js,console,output

Important note : you should also share your case source (unless you know it is a hot source already), as it will be subscribed several times: getCases(dateRange.startDate, dateRange.endDate).share()

Community
  • 1
  • 1
user3743222
  • 18,345
  • 5
  • 69
  • 75