11

Given an IObservable<T> is there a way to use Throttle behaviour (reset a timer when an item is added, but have it return a collection of all the items added within that time?

Buffer provides a similar functionality it that it chunks the data up into IList<T> on every time span or count. But I need that time to reset each time an item is added.

I've seen a similar question here, Does reactive extensions support rolling buffers?, but the answers don't seem ideal and it's a little old so I wondered if the release version of Rx-Main now supports this functionality out the box.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
RichK
  • 11,318
  • 6
  • 35
  • 49
  • It sounds like my `BufferWithInactivity` answer in http://stackoverflow.com/a/7604825/259769 is what you're asking for. Can you clarify your question please? – Enigmativity Jan 13 '12 at 14:14
  • @Enigmativity It is, it's exactly the functionality I'm after. I referenced that question in my question :) But I don't like that answer, the answerer has explicitly stated it's work in progress. – RichK Jan 13 '12 at 14:25
  • Not sure what you're asking. If the timer gets reset every time an item is "added" (propagated?) how will there be anything to buffer in the first place? – Asti Jan 15 '12 at 21:55

3 Answers3

14

As I answered in the other post, yes you can! Using the Throttle and Window methods of Observable:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
{
    var closes = stream.Throttle(delay);
    return stream.Window(() => closes).SelectMany(window => window.ToList());
}
Community
  • 1
  • 1
Colonel Panic
  • 132,665
  • 89
  • 401
  • 465
  • 4
    Nice answer! But shouldn't one publish the stream using `return stream.Publish(hot =>...`), to avoid subscribing twice to cold observables? – Ziriax Feb 15 '17 at 20:57
4

I amended Colonel Panic's BufferUntilInactive operator by adding a Publish component, so that it works correctly with cold observables too:

/// <summary>Projects each element of an observable sequence into consecutive
/// non-overlapping buffers, which are produced based on time and activity,
/// using the specified scheduler to run timers.</summary>
public static IObservable<IList<T>> BufferUntilInactive<T>(
    this IObservable<T> source, TimeSpan dueTime, IScheduler scheduler = default)
{
    scheduler ??= Scheduler.Default;
    return source.Publish(published =>
        published
            .Window(() => published.Throttle(dueTime, scheduler))
            .SelectMany(window => window.ToList())
    );
}

For completeness I've also added an optional IScheduler parameter, which configures the scheduler where the timer is run.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • For a `BufferUntilInactive` variant that also has a `maxCount` parameter, you can look [here](https://stackoverflow.com/questions/7597773/does-reactive-extensions-support-rolling-buffers/69791552#69791552). – Theodor Zoulias Nov 01 '21 at 09:04
  • **Caution:** there is currently [a bug](https://github.com/dotnet/reactive/issues/1846 "Items not processed, when busy processing previous items") in the `Window` operator (Rx version 5.0), that can result in values being lost. The conditions that can trigger the bug are not fully studied at the moment. – Theodor Zoulias Apr 07 '23 at 16:00
0

Wouldn't it work with

Observable.BufferWithTimeOrCount<TSource> Method (IObservable<TSource>, TimeSpan, Int32)?

Oliver
  • 43,366
  • 8
  • 94
  • 151
  • Nope, the time component will kick in immediately and buffer everything until that time elapsed, and the count will buffer n number of items. I need the buffer time to reset when an item is added (like Throttle). – RichK Jan 13 '12 at 13:01