3

My problem is sort of like the one the Nagle algorithm was created to solve, but not exactly. What I'd like is to buffer the OnNext notifications from an IObservable<T> into a sequence of IObservable<IList<T>>s like so:

  1. When the first T notification arrives, add it to a buffer and start a countdown
  2. If another T notification arrives before the countdown expires, add it to the buffer and restart the countdown
  3. Once the countdown expires (i.e. the producer has been silent for some length of time), forward all the buffered T notifications as a single aggregate IList<T> notification.
  4. If the buffer size grows beyond some maximum before the countdown expires, send it anyway.

IObservable<IList<T>> Buffer(this IObservable<T>, Timespan, int, IScheduler) looked promising, but it appears to send aggregate notifications out at regular intervals rather than doing the "start the timer when the first notification arrives and restart it when additional ones arrive" behavior I'd like, and it also sends out an empty list at the end of each time window if no notifications have been produced from below.

I do not want to drop any of the T notifications; just buffer them.

Does anything like this exist, or do I need to write my own?

dlf
  • 9,045
  • 4
  • 32
  • 58
  • 1
    You may be able to modify [the answer in this thread](http://stackoverflow.com/questions/4655437/how-to-implement-buffering-with-timeout-in-rx) to add the "send on buffer full". – Matthew Watson Feb 22 '16 at 16:35
  • Related: [Reactive Throttle Returning All Items Added Within The TimeSpan](https://stackoverflow.com/questions/8849810/reactive-throttle-returning-all-items-added-within-the-timespan) – Theodor Zoulias Dec 27 '20 at 00:53

3 Answers3

6

Some similar questions exist on SO but not exactly like this. Here's an extension method that does the trick.

public static IObservable<IList<TSource>> BufferWithThrottle<TSource>
                                          (this IObservable<TSource> source,
                                           int maxAmount, TimeSpan threshold)
{
    return Observable.Create<IList<TSource>>((obs) =>
    {
        return source.GroupByUntil(_ => true,
                                   g => g.Throttle(threshold).Select(_ => Unit.Default)
                                         .Merge( g.Buffer(maxAmount).Select(_ => Unit.Default)))
                     .SelectMany(i => i.ToList())
                     .Subscribe(obs);
    });
}
supertopi
  • 3,469
  • 26
  • 38
  • That does appear to work. Can you explain what it's doing at all? I haven't encountered `GroupByUntil` before. – dlf Feb 22 '16 at 18:19
  • 2
    Sure. `GroupByUntil` groups incoming values until the `Observable` provided as the second parameter provides a value. In this case we group all notifications to same group and wait for a value from either `Throttle` or `Buffer` (hence the `Merge`). The former ensures the silence threshold, the latter ensures the maximum limit. – supertopi Feb 22 '16 at 19:18
  • 1
    Nice solution. I proposed a slight variation instead of using Buffer that will use less memory in cases of large buffers or a busy stream – Niall Connaughton Feb 26 '16 at 00:07
6

Interesting operator. Supertopi's answer is a good one, but there's an improvement that can be made. If maxAmount is large, and/or the rate of notifications is high, then using Buffer will burn the GC by allocating buffers that get thrown away shortly afterwards.

In order to close each GroupBy Observable after a maxAmount is reached, you don't need to capture a Buffer of all of those elements just to know when it's full. Based on Supertopi's answer, you could change it slightly to the following. Instead of collecting a Buffer of maxAmount elements, it just signals after it has seen maxAmount elements on the stream.

public static IObservable<IList<TSource>> BufferWithThrottle<TSource>(this IObservable<TSource> source, int maxAmount, TimeSpan threshold)
{
    return Observable.Create<IList<TSource>>((obs) =>
    {
        return source.GroupByUntil(_ => true,
                                   g => g.Throttle(threshold).Select(_ => Unit.Default)
                                         .Merge(g.Take(maxAmount)
                                                 .LastAsync()
                                                 .Select(_ => Unit.Default)))
                     .SelectMany(i => i.ToList())
                     .Subscribe(obs);
    });
}
Niall Connaughton
  • 15,518
  • 10
  • 52
  • 48
2

Nice solutions. In my opinion, creating behaviour using existing operators is just for convenience but not for performance.

Also, we should always return IEnumerable instead of IList. Returning the least derived type (IEnumerable) will leave you the most leeway to change the underlying implementation down the track.

Here is my version to implement custom operator.

public static IObservable<IEnumerable<TValue>> BufferWithThrottle<TValue>(this IObservable<TValue> @this, int maxAmount, TimeSpan threshold)
    {
        var buffer = new List<TValue>();

        return Observable.Create<IEnumerable<TValue>>(observer =>
        {
            var aTimer = new Timer();
            void Clear()
            {
                aTimer.Stop();
                buffer.Clear();
            }
            void OnNext()
            {
                observer.OnNext(buffer);
                Clear();
            }
            aTimer.Interval = threshold.TotalMilliseconds;
            aTimer.Enabled = true;
            aTimer.Elapsed += (sender, args) => OnNext();
            var subscription = @this.Subscribe(value =>
            {
                buffer.Add(value);
                if (buffer.Count >= maxAmount)
                    OnNext();
                else
                {
                    aTimer.Stop();
                    aTimer.Start();
                }
            });
            return Disposable.Create(() =>
            {
                Clear();
                subscription.Dispose();
            });
        });
    }

By testing the performance comparing to other solutions, it can save up to 30% CPU power and resolve the memory issue.

Mouse On Mars
  • 1,086
  • 9
  • 28
shtse8
  • 1,092
  • 12
  • 20
  • 1
    I think your answer is not complete? – Mohammad Kanan May 07 '18 at 09:15
  • @MohammadKanan Yes, because I was failed to make it colorful for my first post. So I keep editing my answer. It should be okay now. – shtse8 May 07 '18 at 10:19
  • 2
    One benefit of composing existing operators instead of rolling your own is that it gives you high confidence in the result. You code *looks* like it will work, but I would want to put it though a lot of testing before using it in a production environment (in the time since I asked the question, my need for this operator has disappeared). Still worth an upvote – dlf May 08 '18 at 17:04
  • I'd also quibble with changing the type from IList to IEnumerable. Yes, it gives you the flexibility to swap in something different later, but doing so may trigger bugs for the caller if they were (knowingly or unknowingly) relying on you returning a specific type. For example, if you started returning a lazily-evaluated IEnumerable when you'd been handing out List before. Sure, that's their own fault, but in general, I like to make it as difficult as possible to misuse my APIs. Your mileage may vary, but I prefer to be fairly specific about what I'm returning. – dlf May 08 '18 at 17:10