27

I'm using reactive extensions to collate data into buffers of 100ms:

this.subscription = this.dataService
    .Where(x => !string.Equals("FOO", x.Key.Source))
    .Buffer(TimeSpan.FromMilliseconds(100))
    .ObserveOn(this.dispatcherService)
    .Where(x => x.Count != 0)
    .Subscribe(this.OnBufferReceived);

This works fine. However, I want slightly different behavior than that provided by the Buffer operation. Essentially, I want to reset the timer if another data item is received. Only when no data has been received for the entire 100ms do I want to handle it. This opens up the possibility of never handling the data, so I should also be able to specify a maximum count. I would imagine something along the lines of:

.SlidingBuffer(TimeSpan.FromMilliseconds(100), 10000)

I've had a look around and haven't been able to find anything like this in Rx? Can anyone confirm/deny this?

Kent Boogaart
  • 175,602
  • 35
  • 392
  • 393
  • I'm sure I saw this behaviour in one of the tutorial videos on Rx but I'm afraid I can't remember what or exactly where. :( – Chris Sep 29 '11 at 13:11
  • Ah, throttle (http://msdn.microsoft.com/en-us/library/hh229298%28v=vs.103%29.aspx) is what I was thinking of but I don't think that does what you want on its own. Not sure if there might be some way to combine it to do what is wanted... – Chris Sep 29 '11 at 13:16

6 Answers6

19

This is possible by combining the built-in Window and Throttle methods of Observable. First, let's solve the simpler problem where we ignore the maximum count condition:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
{
    var closes = stream.Throttle(delay);
    return stream.Window(() => closes).SelectMany(window => window.ToList());
}

The powerful Window method did the heavy lifting. Now it's easy enough to see how to add a maximum count:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? max=null)
{
    var closes = stream.Throttle(delay);
    if (max != null)
    {
        var overflows = stream.Where((x,index) => index+1>=max);
        closes = closes.Merge(overflows);
    }
    return stream.Window(() => closes).SelectMany(window => window.ToList());
}

I'll write a post explaining this on my blog. https://gist.github.com/2244036

Documentation for the Window method:

Colonel Panic
  • 132,665
  • 89
  • 401
  • 465
  • 3
    With the above BufferUntilInactive scenario - If the subscriber is slow than the producer, you may see a scenario where the next set of windowed items will be buffered and will not be pushed to subscriber unless an item is generated... – Rohit Sharma Jan 25 '14 at 04:25
  • i've attached a sample http://snipt.org/Bhao0. In visual studio (1) open the output window (2) Check the hang button (3) Click on the button (4) wait for it to print "Click now" on the console. (5) press button three times, you will see those three clicks are missed. – Rohit Sharma Jan 25 '14 at 06:14
16

I wrote an extension to do most of what you're after - BufferWithInactivity.

Here it is:

public static IObservable<IEnumerable<T>> BufferWithInactivity<T>(
    this IObservable<T> source,
    TimeSpan inactivity,
    int maximumBufferSize)
{
    return Observable.Create<IEnumerable<T>>(o =>
    {
        var gate = new object();
        var buffer = new List<T>();
        var mutable = new SerialDisposable();
        var subscription = (IDisposable)null;
        var scheduler = Scheduler.ThreadPool;

        Action dump = () =>
        {
            var bts = buffer.ToArray();
            buffer = new List<T>();
            if (o != null)
            {
                o.OnNext(bts);
            }
        };

        Action dispose = () =>
        {
            if (subscription != null)
            {
                subscription.Dispose();
            }
            mutable.Dispose();
        };

        Action<Action<IObserver<IEnumerable<T>>>> onErrorOrCompleted =
            onAction =>
            {
                lock (gate)
                {
                    dispose();
                    dump();
                    if (o != null)
                    {
                        onAction(o);
                    }
                }
            };

        Action<Exception> onError = ex =>
            onErrorOrCompleted(x => x.OnError(ex));

        Action onCompleted = () => onErrorOrCompleted(x => x.OnCompleted());

        Action<T> onNext = t =>
        {
            lock (gate)
            {
                buffer.Add(t);
                if (buffer.Count == maximumBufferSize)
                {
                    dump();
                    mutable.Disposable = Disposable.Empty;
                }
                else
                {
                    mutable.Disposable = scheduler.Schedule(inactivity, () =>
                    {
                        lock (gate)
                        {
                            dump();
                        }
                    });
                }
            }
        };

        subscription =
            source
                .ObserveOn(scheduler)
                .Subscribe(onNext, onError, onCompleted);

        return () =>
        {
            lock (gate)
            {
                o = null;
                dispose();
            }
        };
    });
}
Enigmativity
  • 113,464
  • 11
  • 89
  • 172
2

With Rx Extensions 2.0, your can answer both requirements with a new Buffer overload accepting a timeout and a size:

this.subscription = this.dataService
    .Where(x => !string.Equals("FOO", x.Key.Source))
    .Buffer(TimeSpan.FromMilliseconds(100), 1)
    .ObserveOn(this.dispatcherService)
    .Where(x => x.Count != 0)
    .Subscribe(this.OnBufferReceived);

See https://msdn.microsoft.com/en-us/library/hh229200(v=vs.103).aspx for the documentation.

  • But this won't have a sliding window, with the kind of 'debounce' behaviour that was requested? – Cocowalla Mar 29 '17 at 19:43
  • @Cocowalla I reread the original question and the code I provided does fulfill all the requirements. I have used this in production code with great success. – Sébastien Lorion Apr 10 '17 at 15:42
  • Sorry, I meant specifically the debounce behaviour: "I want to reset the timer if another data item is received" - I don't see that your code does this? AFAICS, your code will always push the buffer to the subscriber every 100ms (as long as it's not empty) – Cocowalla Apr 10 '17 at 20:07
  • I now see what you mean by debounce, thought my understanding of that term is more like http://reactivex.io/documentation/operators/debounce.html, which is `Observable.Throttle`. What you ask is more complicated but I guess it can be done with `Observable.Window`. In any case, unless I am missing something, my answer does exactly the same thing as the accepted answer on this question. – Sébastien Lorion Apr 27 '17 at 16:00
  • 1
    Nope, this answer has a different behavior than the accepted answer. The accepted answer correctly (according to the requirements) postpones emitting the buffer in case the source observable is continuously active. This answer simply emits the buffer every 100 milliseconds. – Theodor Zoulias Dec 28 '20 at 07:41
2

As Rohit Sharma mentioned with his comment at Colonel Panic's solution, there is a problem with where items will be buffered and will not be pushed to subscriber unless an item is generated.

As described in this comment the problem is p.Window(() => closes), because it opens up a gap in which events can be missed.

That lambda is going to be invoked after each window is processed. And the Window operator is going to call Subscribe on what the lambda returns each time, because as far as it knows, you might return a completely different IObservable from that lambda every time.

Since now always the same lambda is used, we need to adjust the maxCount. Without the change the maxCount would never be reseted and after it was hit once, every new event would be over the maxCount.

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? maxCount = null)
{
    var publish = stream.Publish(p =>
    {
        var closes = p.Throttle(delay);

        if (maxCount != null)
        {
            Int32 i = 0;

            var overflows = p.Where(x =>
            {
                ++i;

                if (i >= maxCount)
                {
                    i = 0;
                    return true;
                }

                return false;
            });

            closes = closes.Merge(overflows);
        }

        return p.Window(closes).SelectMany(window => window.ToList());
    });

    return publish;
}

Update:
After further tests i found out that still, in some cases, items will not be correctly pushed to the subscriber.

Here is the workaround which works for us since already 4 months without any problems.

The workaround is adding .Delay(...) with any TimeSpan.

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? maxCount = null)
{
    var publish = stream.Publish(p =>
    {
        var closes = p.Throttle(delay);

        if (maxCount != null)
        {
            var overflows = stream.Where((x, index) => index + 1 >= maxCount);
            closes = closes.Merge(overflows);
        }

        return p.Window(() => closes).SelectMany(window => window.ToList()).Delay(TimeSpan.Zero);
    });

    return publish;
}
Mayr Philipp
  • 46
  • 1
  • 3
0

Colonel Panic's solution is almost perfect. The only thing that is missing is a Publish component, in order to make the solution work with cold sequences too.

/// <summary>
/// Projects each element of an observable sequence into a buffer that's sent out
/// when either a given inactivity timespan has elapsed, or it's full,
/// using the specified scheduler to run timers.
/// </summary>
public static IObservable<IList<T>> BufferUntilInactive<T>(
    this IObservable<T> source, TimeSpan dueTime, int maxCount,
    IScheduler scheduler = default)
{
    if (maxCount < 1) throw new ArgumentOutOfRangeException(nameof(maxCount));
    scheduler ??= Scheduler.Default;
    return source.Publish(published =>
    {
        var combinedBoundaries = Observable.Merge
        (
            published.Throttle(dueTime, scheduler),
            published.Skip(maxCount - 1)
        );

        return published
            .Window(() => combinedBoundaries)
            .SelectMany(window => window.ToList());
    });
}

Beyond adding the Publish, I've also replaced the original .Where((_, index) => index + 1 >= maxCount) with the equivalent but shorter .Skip(maxCount - 1). For completeness there is also an IScheduler parameter, which configures the scheduler where the timer is run.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • For a simpler `BufferUntilInactive` variant that does not include a `maxCount` parameter, you can look [here](https://stackoverflow.com/questions/8849810/reactive-throttle-returning-all-items-added-within-the-timespan/65464391#65464391). – Theodor Zoulias Nov 01 '21 at 09:05
  • **Caution:** there is currently [a bug](https://github.com/dotnet/reactive/issues/1846 "Items not processed, when busy processing previous items") in the `Window` operator (Rx version 5.0), that can result in values being lost. The conditions that can trigger the bug are not fully studied at the moment. – Theodor Zoulias Apr 07 '23 at 16:00
0

I guess this can be implemented on top of Buffer method as shown below:

public static IObservable<IList<T>> SlidingBuffer<T>(this IObservable<T> obs, TimeSpan span, int max)
        {
            return Observable.CreateWithDisposable<IList<T>>(cl =>
            {
                var acc = new List<T>();
                return obs.Buffer(span)
                        .Subscribe(next =>
                        {
                            if (next.Count == 0) //no activity in time span
                            {
                                cl.OnNext(acc);
                                acc.Clear();
                            }
                            else
                            {
                                acc.AddRange(next);
                                if (acc.Count >= max) //max items collected
                                {
                                    cl.OnNext(acc);
                                    acc.Clear();
                                }
                            }
                        }, err => cl.OnError(err), () => { cl.OnNext(acc); cl.OnCompleted(); });
            });
        }

NOTE: I haven't tested it, but I hope it gives you the idea.

Ankur
  • 33,367
  • 2
  • 46
  • 72