39

I would like to set up an Rx subscription that can respond to an event right away, and then ignore subsequent events that happen within a specified "cooldown" period.

The out of the box Throttle/Buffer methods respond only once the timeout has elapsed, which is not quite what I need.

Here is some code that sets up the scenario, and uses a Throttle (which isn't the solution I want):

class Program
{
    static Stopwatch sw = new Stopwatch();

    static void Main(string[] args)
    {
        var subject = new Subject<int>();
        var timeout = TimeSpan.FromMilliseconds(500);

        subject
            .Throttle(timeout)
            .Subscribe(DoStuff);

        var factory = new TaskFactory();
         
        sw.Start();

        factory.StartNew(() =>
        {
            Console.WriteLine("Batch 1 (no delay)");
            subject.OnNext(1);
        });

        factory.StartNewDelayed(1000, () =>
        {
            Console.WriteLine("Batch 2 (1s delay)");
            subject.OnNext(2);
        });
 
        factory.StartNewDelayed(1300, () =>
        {
            Console.WriteLine("Batch 3 (1.3s delay)");
            subject.OnNext(3);
        });

        factory.StartNewDelayed(1600, () =>
        {
            Console.WriteLine("Batch 4 (1.6s delay)");
            subject.OnNext(4);
        });

        Console.ReadKey();
        sw.Stop();
    }

    private static void DoStuff(int i)
    {
        Console.WriteLine("Handling {0} at {1}ms", i, sw.ElapsedMilliseconds);
    }
}

The output of running this right now is:

Batch 1 (no delay)

Handling 1 at 508ms

Batch 2 (1s delay)

Batch 3 (1.3s delay)

Batch 4 (1.6s delay)

Handling 4 at 2114ms

Note that batch 2 isn't handled (which is fine!) because we wait for 500ms to elapse between requests due to the nature of throttle. Batch 3 is also not handled, (which is less alright because it happened more than 500ms from batch 2) due to its proximity to Batch 4.

What I'm looking for is something more like this:

Batch 1 (no delay)

Handling 1 at ~0ms

Batch 2 (1s delay)

Handling 2 at ~1000s

Batch 3 (1.3s delay)

Batch 4 (1.6s delay)

Handling 4 at ~1600s

Note that batch 3 wouldn't be handled in this scenario (which is fine!) because it occurs within 500ms of Batch 2.

EDIT:

Here is the implementation for the "StartNewDelayed" extension method that I use:

/// <summary>Creates a Task that will complete after the specified delay.</summary>
/// <param name="factory">The TaskFactory.</param>
/// <param name="millisecondsDelay">The delay after which the Task should transition to RanToCompletion.</param>
/// <returns>A Task that will be completed after the specified duration.</returns>
public static Task StartNewDelayed(
    this TaskFactory factory, int millisecondsDelay)
{
    return StartNewDelayed(factory, millisecondsDelay, CancellationToken.None);
}

/// <summary>Creates a Task that will complete after the specified delay.</summary>
/// <param name="factory">The TaskFactory.</param>
/// <param name="millisecondsDelay">The delay after which the Task should transition to RanToCompletion.</param>
/// <param name="cancellationToken">The cancellation token that can be used to cancel the timed task.</param>
/// <returns>A Task that will be completed after the specified duration and that's cancelable with the specified token.</returns>
public static Task StartNewDelayed(this TaskFactory factory, int millisecondsDelay, CancellationToken cancellationToken)
{
    // Validate arguments
    if (factory == null) throw new ArgumentNullException("factory");
    if (millisecondsDelay < 0) throw new ArgumentOutOfRangeException("millisecondsDelay");

    // Create the timed task
    var tcs = new TaskCompletionSource<object>(factory.CreationOptions);
    var ctr = default(CancellationTokenRegistration);

    // Create the timer but don't start it yet.  If we start it now,
    // it might fire before ctr has been set to the right registration.
    var timer = new Timer(self =>
    {
        // Clean up both the cancellation token and the timer, and try to transition to completed
        ctr.Dispose();
        ((Timer)self).Dispose();
        tcs.TrySetResult(null);
    });

    // Register with the cancellation token.
    if (cancellationToken.CanBeCanceled)
    {
        // When cancellation occurs, cancel the timer and try to transition to cancelled.
        // There could be a race, but it's benign.
        ctr = cancellationToken.Register(() =>
        {
            timer.Dispose();
            tcs.TrySetCanceled();
        });
    }

    if (millisecondsDelay > 0)
    {
        // Start the timer and hand back the task...
        timer.Change(millisecondsDelay, Timeout.Infinite);
    }
    else
    {
        // Just complete the task, and keep execution on the current thread.
        ctr.Dispose();
        tcs.TrySetResult(null);
        timer.Dispose();
    }

    return tcs.Task;
}
Community
  • 1
  • 1
Andrew Anderson
  • 3,409
  • 22
  • 25
  • Related: [How to throttle event stream using RX?](https://stackoverflow.com/questions/3211134/how-to-throttle-event-stream-using-rx) – Theodor Zoulias Dec 26 '20 at 17:02

9 Answers9

18

Here's my approach. It's similar to others that have gone before, but it doesn't suffer the over-zealous window production problem.

The desired function works a lot like Observable.Throttle but emits qualifying events as soon as they arrive rather than delaying for the duration of the throttle or sample period. For a given duration after a qualifying event, subsequent events are suppressed.

Given as a testable extension method:

public static class ObservableExtensions
{
    public static IObservable<T> SampleFirst<T>(
        this IObservable<T> source,
        TimeSpan sampleDuration,
        IScheduler scheduler = null)
    {
        scheduler = scheduler ?? Scheduler.Default;
        return source.Publish(ps => 
            ps.Window(() => ps.Delay(sampleDuration,scheduler))
              .SelectMany(x => x.Take(1)));
    }
}

The idea is to use the overload of Window that creates non-overlapping windows using a windowClosingSelector that uses the source time-shifted back by the sampleDuration. Each window will therefore: (a) be closed by the first element in it and (b) remain open until a new element is permitted. We then simply select the first element from each window.

Rx 1.x Version

The Publish extension method used above is not available in Rx 1.x. Here is an alternative:

public static class ObservableExtensions
{
    public static IObservable<T> SampleFirst<T>(
        this IObservable<T> source,
        TimeSpan sampleDuration,
        IScheduler scheduler = null)
    {
        scheduler = scheduler ?? Scheduler.Default;
        var sourcePub = source.Publish().RefCount();
        return sourcePub.Window(() => sourcePub.Delay(sampleDuration,scheduler))
                        .SelectMany(x => x.Take(1));
    }
}
James World
  • 29,019
  • 9
  • 86
  • 120
  • I read about Publish here: http://www.introtorx.com/content/v1.0.10621.0/14_HotAndColdObservables.html#PublishAndConnect. Is it really needed in SampleFirst? – Mauro Sampietro Nov 28 '14 at 12:47
  • @sam Yes, a correct implementation is needed since we are making two subscriptions to the source. You should always write operators in such a way that the number of subscriptions is not "surprising". If the observable was cold or had other side-effects, then not publishing could break the implementation or cause undesired side-effects. We also want to ensure the element used in the Delay part is the same element that opened the window thanks to the multicast of publish. Otherwise you may have timing issues. – James World Nov 28 '14 at 13:16
  • 1
    @sam Also note that at the time that link was written, the lambda overload of `Publish` did not exist. This superb edition to the API has made handling this sort of scenario far far easier. – James World Nov 28 '14 at 13:18
  • If using Rx 1.0 you will need `Publish().Refcount()` instead of the newer style of `Publish` – James World Feb 04 '15 at 15:08
  • 2
    I used this solution for a long time but when i started to investigate a strange overusage of the cpu and memory i find out this extensions method was involved. each item is delayed so it is alive and not discarded. If you have a lot of input they can accumulate i think. Can you confirm this @James World? – Mauro Sampietro May 13 '16 at 12:26
8

The solution I found after a lot of trial and error was to replace the throttled subscription with the following:

subject
    .Window(() => { return Observable.Interval(timeout); })
    .SelectMany(x => x.Take(1))
    .Subscribe(i => DoStuff(i));

Edited to incorporate Paul's clean-up.

Andrew Anderson
  • 3,409
  • 22
  • 25
4

Awesome solution Andrew! We can take this a step further though and clean up the inner Subscribe:

subject
    .Window(() => { return Observable.Interval(timeout); })
    .SelectMany(x => x.Take(1))
    .Subscribe(DoStuff);
Ana Betts
  • 73,868
  • 16
  • 141
  • 209
2

The initial answer I posted has a flaw: namely that the Window method, when used with an Observable.Interval to denote the end of the window, sets up an infinite series of 500ms windows. What I really need is a window that starts when the first result is pumped into the subject, and ends after the 500ms.

My sample data masked this problem because the data broke down nicely into the windows that were already going to be created. (i.e. 0-500ms, 501-1000ms, 1001-1500ms, etc.)

Consider instead this timing:

factory.StartNewDelayed(300,() =>
{
    Console.WriteLine("Batch 1 (300ms delay)");
    subject.OnNext(1);
});

factory.StartNewDelayed(700, () =>
{
    Console.WriteLine("Batch 2 (700ms delay)");
    subject.OnNext(2);
});

factory.StartNewDelayed(1300, () =>
{
    Console.WriteLine("Batch 3 (1.3s delay)");
    subject.OnNext(3);
});

factory.StartNewDelayed(1600, () =>
{
    Console.WriteLine("Batch 4 (1.6s delay)");
    subject.OnNext(4);
});

What I get is:

Batch 1 (300ms delay)

Handling 1 at 356ms

Batch 2 (700ms delay)

Handling 2 at 750ms

Batch 3 (1.3s delay)

Handling 3 at 1346ms

Batch 4 (1.6s delay)

Handling 4 at 1644ms

This is because the windows begin at 0ms, 500ms, 1000ms, and 1500ms and so each Subject.OnNext fits nicely into its own window.

What I want is:

Batch 1 (300ms delay)

Handling 1 at ~300ms

Batch 2 (700ms delay)

Batch 3 (1.3s delay)

Handling 3 at ~1300ms

Batch 4 (1.6s delay)

After a lot of struggling and an hour banging on it with a co-worker, we arrived at a better solution using pure Rx and a single local variable:

bool isCoolingDown = false;

subject
    .Where(_ => !isCoolingDown)
    .Subscribe(
    i =>
    {
        DoStuff(i);

        isCoolingDown = true;

        Observable
            .Interval(cooldownInterval)
            .Take(1)
            .Subscribe(_ => isCoolingDown = false);
    });

Our assumption is that calls to the subscription method are synchronized. If they are not, then a simple lock could be introduced.

Andrew Anderson
  • 3,409
  • 22
  • 25
1

Use .Scan() ! This is what I use for Throttling when I need the first hit (after a certain period) immediately, but delay (and group/ignore) any subsequent hits. Basically works like Throttle, but fires immediately if the previous onNext was >= interval ago, otherwise, schedule it at exactly interval from the previous hit. And of course, if within the 'cooling down' period multiple hits come, the additional ones are ignored, just like Throttle does. The difference with your use case is that if you get an event at 0 ms and 100 ms, they will both be handled (at 0ms and 500ms), which might be what you actually want (otherwise, the accumulator is easy to adapt to ignore ANY hit closer than interval to the previous one).

public static IObservable<T> QuickThrottle<T>(this IObservable<T> src, TimeSpan interval, IScheduler scheduler)
{
  return src
    .Scan(new ValueAndDueTime<T>(), (prev, id) => AccumulateForQuickThrottle(prev, id, interval, scheduler))
    .Where(vd => !vd.Ignore)
    .SelectMany(sc => Observable.Timer(sc.DueTime, scheduler).Select(_ => sc.Value));
}

private static ValueAndDueTime<T> AccumulateForQuickThrottle<T>(ValueAndDueTime<T> prev, T value, TimeSpan interval, IScheduler s)
{
  var now = s.Now;

  // Ignore this completely if there is already a future item scheduled
  //  but do keep the dueTime for accumulation!
  if (prev.DueTime > now) return new ValueAndDueTime<T> { DueTime = prev.DueTime, Ignore = true };

  // Schedule this item at at least interval from the previous
  var min = prev.DueTime + interval;
  var nextTime = (now < min) ? min : now;
  return new ValueAndDueTime<T> { DueTime = nextTime, Value = value };
}

private class ValueAndDueTime<T>
{
  public DateTimeOffset DueTime;
  public T Value;
  public bool Ignore;
}
Bart de Boer
  • 381
  • 2
  • 7
0

I have stumbled upon this question while trying to re-implement my own solution to the same or similar problem using .Window Take a look, it seems to be the same as this one and solved quite elegantly:

https://stackoverflow.com/a/3224723/58463

Community
  • 1
  • 1
Sergey Aldoukhov
  • 22,316
  • 18
  • 72
  • 99
0

It's an old post, but no answer could really fill my needs, so I'm giving my own solution :

public static IObservable<T> ThrottleOrImmediate<T>(this IObservable<T> source, TimeSpan delay, IScheduler scheduler)
{
    return Observable.Create<T>((obs, token) =>
    {
        // Next item cannot be send before that time
        DateTime nextItemTime = default;

        return Task.FromResult(source.Subscribe(async item =>
        {
            var currentTime = DateTime.Now;
            // If we already reach the next item time
            if (currentTime - nextItemTime >= TimeSpan.Zero)
            {
                // Following item will be send only after the set delay
                nextItemTime = currentTime + delay;
                // send current item with scheduler
                scheduler.Schedule(() => obs.OnNext(item));
            }
            // There is still time before we can send an item
            else
            {
                // we schedule the time for the following item
                nextItemTime = currentTime + delay;
                try
                {
                    await Task.Delay(delay, token);
                }
                catch (TaskCanceledException)
                {
                    return;
                }

                // If next item schedule was change by another item then we stop here
                if (nextItemTime > currentTime + delay)
                    return;
                else
                {
                    // Set next possible time for an item and send item with scheduler
                    nextItemTime = currentTime + delay;
                    scheduler.Schedule(() => obs.OnNext(item));
                }
            }
        }));

    });
}

First item is immediately sent, then following items are throttled. Then if a following item is sent after the delayed time, it's immediately sent too.

0

Well the most obvious thing will be to use Repeat() here. However, as far as I know Repeat() might introduce problems so that notifications disappear in between the moment when the stream stops and we subscribe again. In practice this has never been a problem for me.

subject
    .Take(1)
    .Concat(Observable.Empty<long>().Delay(TimeSpan.FromMilliseconds(500)))
    .Repeat();

Remember to replace with the actual type of your source.

UPDATE:

Updated query to use Concat instead of Merge

Christoph
  • 26,519
  • 28
  • 95
  • 133
0

I got another one for your. This one doesn't use Repeat() nor Interval() so it might be what you are after:

subject
    .Window(() => Observable.Timer(TimeSpan.FromMilliseconds(500)))
    .SelectMany(x => x.Take(1));
Christoph
  • 26,519
  • 28
  • 95
  • 133
  • The problem with using Window is that it starts immediately, and then ends 500ms later (in your example). The problem with this is documented in the accepted solution, above. Briefly though: I need a timer/window that starts when the sequence receives a value. – Andrew Anderson Nov 10 '11 at 19:25
  • Mmh..did you try the other solution which is based on Repeat() as well? I think there must be an easy way to do that without introducing state variables or building complex extension methods on your own. It seems to be a perfectly valid uses case. – Christoph Nov 12 '11 at 00:35