11

In my WPF application using .Net 4.6 I have an event which fires new data points at a high rate (several hundred per second), but not all the time. This data is displayed in a chart.

I would like to update the chart every 50 ms and not after each new data point.
To achieve that I though of using Buffer(TimeSpan.FromMilliseconds(50)) from Rx, which in theory works fine. BUT my subscriber is also called every 50 ms if no new data points are created which is not exactly what I want.

I created a little sample application to test that out:

using System;
using System.Reactive.Linq;

namespace RxTester
{
    public class Program
    {
        private static event EventHandler TheEvent;

        static void Main(string[] args)
        {
            var observable = Observable.FromEvent<EventHandler, EventArgs>(h => (s, e) => h(e), h => TheEvent += h, h => TheEvent -= h);
            var subscriber = observable.Buffer(TimeSpan.FromMilliseconds(1000))
                .Subscribe(e => Console.WriteLine($"{DateTime.Now.ToLongTimeString()}: {e.Count} elements received..."));

            var random = new Random();
            var timer = new System.Timers.Timer(2000)
                {
                    AutoReset = true,
                    Enabled = true
                };
            timer.Elapsed += (s, e) =>
                {
                    var amount = random.Next(1, 10);
                    for (int i = 0; i < amount; ++i)
                        TheEvent?.Invoke(null, null);
                };

            Console.ReadLine();

            timer.Enabled = false;
            subscriber.Dispose();
        }
    }
}

You need to add the "Rx-Linq" NuGet package for it to run or use the following Fiddle: https://dotnetfiddle.net/TV5tD4

There you see several "0 elements received" which is what I would like to avoid. I know I could simple check for e.Count == 0, but as I use multiple of such buffers this does not seem optimal to me.

Is there a way to only create new buffered blocks of element if elements are available?
I am also open for other approaches to solve my problem of batching events on a time basis - I already looked into TPL Dataflows BatchBlock, but that seems to only support count based block sizes.

Christoph Fink
  • 22,727
  • 9
  • 68
  • 113
  • Somewhat related: [RX - Group/Batch bursts of elements in an observable sequence](https://stackoverflow.com/questions/29858974/rx-group-batch-bursts-of-elements-in-an-observable-sequence) and [Why does Rx buffer continuously perform method when buffer contains no items?](https://stackoverflow.com/questions/30081331/why-does-rx-buffer-continuously-perform-method-when-buffer-contains-no-items) – Theodor Zoulias Jul 13 '22 at 23:06

2 Answers2

5

Once again we can use the powerful GroupByUntil method to create this extension

public static IObservable<IList<TSource>> BufferWhenAvailable<TSource>
                                          (this IObservable<TSource> source, 
                                           TimeSpan threshold)
{
    return source.Publish( sp => 
                    sp.GroupByUntil(_ => true, _ => Observable.Timer(threshold))
                      .SelectMany(i => i.ToList()));

}
supertopi
  • 3,469
  • 26
  • 38
4

The standard way of doing this is simply

.Buffer(period)
.Where(buffer=>buffer.Any())

So effectively doing what you want to avoid (count==0). However, this check is very cheap and I would imagine if far cheaper than the other cost involved i.e. Scheduling. The only concern might be the amount allocations that are happening (every 50ms creating a List<T>) and then the impending GC Gen0 pressure that may build.

Lee Campbell
  • 10,631
  • 1
  • 34
  • 29
  • Thanks for your input, but `The only concern might be the amount allocations that are happening` is why I want to avoid this, as I possible have multiple of these buffers at once... – Christoph Fink Feb 25 '16 at 05:52
  • 2
    I hear ya. Sounds like it would a good one to add to the tool box. If I come up with a soln to this will get back to you https://github.com/LeeCampbell/RxCookbook/issues/27 – Lee Campbell Feb 25 '16 at 06:18
  • @LeeCampbell if I understand correctly: even if you prevent `Buffer(TimeSpan)` from yielding empty buffers, the solution is still different. The buffers are "started" based on the timer, not when a new value arrives. – supertopi Feb 25 '16 at 07:10
  • Yes. Buffer will use constant cadence windows. If you want to start buffers based on when values arrive, then you probably want to look into the `GroupJoin` operator http://introtorx.com/Content/v1.0.10621.0/17_SequencesOfCoincidence.html and produce something custom yourself. – Lee Campbell Feb 26 '16 at 00:43
  • Upvote on supertopii's answer (though I would like to see the supporting unit tests) – Lee Campbell Feb 26 '16 at 00:50