10

I'm producing a sequence of 50 items each tree seconds. I then want to batch them at max 20 items, but also not waiting more than one second before I release the buffer.

That works great!

But since the interval never dies, Buffer keeps firing empty batch chunks...

How can I avoid that? Shure Where(buf => buf.Count > 0)should help - but that seems like a hack.

Observable
    .Interval(TimeSpan.FromSeconds(3))
    .Select(n => Observable.Repeat(n, 50))
    .Merge()
    .Buffer(TimeSpan.FromSeconds(1), 20)
    .Subscribe(e => Console.WriteLine(e.Count));

Output:

0-0-0-20-20-10-0-20-20-10-0-0-20-20
svick
  • 236,525
  • 50
  • 385
  • 514
Lars Corneliussen
  • 2,513
  • 2
  • 26
  • 36

3 Answers3

7

The Where filter you propose is a sound approach, I'd go with that.

You could wrap the Buffer and Where into a single helper method named to make the intent clearer perhaps, but rest assured the Where clause is idiomatic Rx in this scenario.

Think of it this way; an empty Buffer is relaying information that no events occurred in the last second. While you can argue that this is implicit, it would require extra work to detect this if Buffer didn't emit an empty list. It just so happens it's not information you are interested in - so Where is an appropriate way to filter this information out.

A lazy timer solution

Following from your comment ("...the timer... be[ing] lazily initiated...") you can do this to create a lazy timer and omit the zero counts:

var source = Observable.Interval(TimeSpan.FromSeconds(3))
                    .Select(n => Observable.Repeat(n, 50))
                    .Merge();

var xs = source.Publish(pub =>
    pub.Buffer(() => pub.Take(1).Delay(TimeSpan.FromSeconds(1))
                        .Merge(pub.Skip(19)).Take(1)));

xs.Subscribe(x => Console.WriteLine(x.Count));

Explanation

Publishing

This query requires subscribing to the source events multiple times. To avoid unexpected side-effects, we use Publish to give us pub which is a stream that multicasts the source creating just a single subscription to it. This replaces the older Publish().RefCount() technique that achieved the same end, effectively giving us a "hot" version of the source stream.

In this case, this is necessary to ensure the subsequent buffer closing streams produced after the first will start with the current events - if the source was cold they would start over each time. I wrote a bit about publishing here.

The main query

We use an overload of Buffer that accepts a factory function that is called for every buffer emitted to obtain an observable stream whose first event is a signal to terminate the current buffer.

In this case, we want to terminate the buffer when either the first event into the buffer has been there for a full second, or when 20 events have appeared from the source - whichever comes first.

To achieve this we Merge streams that describe each case - the Take(1).Delay(...) combo describes the first condition, and the Skip(19).Take(1) describes the second.

However, I would still test performance the easy way, because I still suspect this is overkill, but a lot depends on the precise details of the platform and scenario etc.

James World
  • 29,019
  • 9
  • 86
  • 120
  • But this means the system is doing something every second - although it is totally unnecessary. In my real example it is each 20ms - which is worse... – Lars Corneliussen Feb 03 '14 at 12:00
  • The only additional thing it is doing is invoking the `Where` delegate - this is very minimal overhead (check the source code). If you want less, you'll need to rewrite `Buffer`, because that is the thing doing the periodic work. I do think you are worrying unnecessarily though - I highly doubt this will be the source of your performance woes. Checking a List count is O(1). – James World Feb 03 '14 at 12:04
  • By the way, with a 20ms interval don't expect much accuracy - it's very tight considering .NET timer resolution is 15ms. – James World Feb 03 '14 at 12:09
  • Not worrying about the `Count`-check - worrying about the timer beeing active all day while it could be lazily initiated when a value arrives... but maybe these worries are as unecessary as the one about the `Counts`. (From JS - I'd rather go with chained setTimeouts than with a single `setInterval`...) – Lars Corneliussen Feb 03 '14 at 12:30
  • See edited answer for a lazy timer buffer, effectively creating a chained timer as you describe. – James World Feb 03 '14 at 13:46
  • Throttle wasn't quite right... edited it to use Delay instead. – James World Feb 03 '14 at 20:15
  • Rx -> format head: - - I can't wrap my head around it. Could you try to explain? – Lars Corneliussen Feb 04 '14 at 20:19
  • Ok :) I agree it's not obvious. I added some extra detail, does that help? – James World Feb 04 '14 at 22:54
  • Wonderful!! :-) - - But yes, I'll check which to go for. – Lars Corneliussen Feb 04 '14 at 23:03
  • I might be doing something wrong, but testing this solution with System.Reactive 5.0 produces sometimes buffers with 21 items instead of 20. Even worse, the timer of the buffer sometimes follows the second element instead of the first, resulting in the violation of the 1 sec maximum latency policy for the first element in the buffer. I have found Alex's `GroupByUntil`-based [solution](https://stackoverflow.com/a/36603471/11178549) to be more reliable for my needs. – Theodor Zoulias Jul 13 '22 at 22:46
4

After using the accepted answer for quite a while I would now suggest a different implementation (inspired by James Skip / Take approach and this answer):

var source = Observable.Interval(TimeSpan.FromSeconds(3))
    .Select(n => Observable.Repeat(n, 50))
    .Merge();

var xs = source.BufferOmitEmpty(TimeSpan.FromSeconds(1), 20);

xs.Subscribe(x => Console.WriteLine(x.Count));

With an extension method BufferOmitEmpty like:

public static IObservable<IList<TSource>> BufferOmitEmpty<TSource>(this IObservable<TSource> observable, TimeSpan maxDelay, int maxBufferCount)
{
    return observable
        .GroupByUntil(x => 1, g => Observable.Timer(maxDelay).Merge(g.Skip(maxBufferCount - 1).Take(1).Select(x => 1L)))
        .Select(x => x.ToArray())
        .Switch();
}  

It is 'lazy', because no groups are created as long as there are no elements on the source sequence, so there are no empty buffers. As in Toms answer there is an other nice advantage to the Buffer / Where implementation, that is the buffer is started when the first element arrives. So elements following each other within buffer time after a quiet period are processed in the same buffer.

Why not to use the Buffer method

Three problems occured when I was using the Buffer approach (they might be irrelevant for the scope of the question, so this is a warning to people who use stack overflow answers in different contexts like me):

  1. Because of the Delay one thread is used per subscriber.
  2. In scenarios with long running subscribers elements from the source sequence can be lost.
  3. With multiple subscribers it sometimes creates buffers with count greater than maxBufferCount.

(I can supply sample code for 2. and 3. but I'm insecure whether to post it here or in a different question because I cannot fully explain why it behaves this way)

Community
  • 1
  • 1
Alex
  • 293
  • 2
  • 10
  • +1. I should mention that this solution not only prevents empty buffers, but it also results in a clear and intuitive timing behavior: the timer starts when the first element in the buffer is received. So essentially it enforces a maximum latency policy: no element will be queued for more than `maxDelay` duration, while reducing the frequency of emission to the minimum required for enforcing the aforementioned policy. – Theodor Zoulias Jul 13 '22 at 22:56
0

RxJs5 has hidden features buried into their source code. It turns out it's pretty easy to achieve with bufferTime

From the source code, the signature looks like this:

export function bufferTime<T>(this: Observable<T>, bufferTimeSpan: number, bufferCreationInterval: number, maxBufferSize: number, scheduler?: IScheduler): Observable<T[]>;

So your code would be like this:

observable.bufferTime(1000, null, 20)
magicgregz
  • 7,471
  • 3
  • 35
  • 27