1

I'm trying to "throttle" an IObservable in (what I think is) a different way of the standard throttle methods.
I want to ignore values for 1s following a first non ignored value in the stream.

For example, if 1s=5 dashes

source: --1-23--45-----678901234
result: --1-----4------6----1---

Any ideas on how to achieve this?

pauloya
  • 2,535
  • 3
  • 30
  • 50
  • possible duplicate of [Rx: How can I respond immediately, and throttle subsequent requests](http://stackoverflow.com/questions/7999503/rx-how-can-i-respond-immediately-and-throttle-subsequent-requests) – James World Mar 04 '15 at 15:00
  • @JamesWorld That's exactly the same question, thanks! I voted to close. – pauloya Mar 04 '15 at 17:04
  • @JamesWorld Correction: It's NOT exactly the same question, I want my period of time to start when the first element comes in. While the other question just wants to get a value every second. In my example, "6" is a bit phased out, which makes the last number be "1", instead of "9" – pauloya Mar 04 '15 at 17:18
  • It really is the same question, and I've used exactly the same answer I gave in that question, with a test scenario mimicking your example exactly. – James World Mar 04 '15 at 20:34

2 Answers2

3

Here is an idiomatic way to do this in Rx, as an extension method - an explanation and example using your scenario follows.

The desired function works a lot like Observable.Throttle but emits qualifying events as soon as they arrive rather than delaying for the duration of the throttle or sample period. For a given duration after a qualifying event, subsequent events are suppressed:

public static IObservable<T> SampleFirst<T>(
    this IObservable<T> source,
    TimeSpan sampleDuration,
    IScheduler scheduler = null)
{
    scheduler = scheduler ?? Scheduler.Default;
    return source.Publish(ps => 
        ps.Window(() => ps.Delay(sampleDuration,scheduler))
          .SelectMany(x => x.Take(1)));
}

The idea is to use the overload of Window that creates non-overlapping windows using a windowClosingSelector that uses the source time-shifted back by the sampleDuration. Each window will therefore: (a) be closed by the first element in it and (b) remain open until a new element is permitted. We then simply select the first element from each window.

In the following example, I have repeated exactly your test scenario modelling one "dash" as 100 ticks. Note the delay is specified as 499 ticks rather than 500 due to the resolution of passing events between multiple schedulers causing 1 tick drifts - in practice you wouldn't need to dwell on this as single tick resolutions is unlikely to be meaningful. The ReactiveTest class and OnNext helper methods are made available by including the Rx testing framework nuget package rx-testing:

public class Tests : ReactiveTest
{
    public void Scenario()
    {
        var scheduler = new TestScheduler();
        var test = scheduler.CreateHotObservable<int>(    
            // set up events as per the OP scenario
            // using 1 dash = 100 ticks        
            OnNext(200, 1),
            OnNext(400, 2),
            OnNext(500, 3),
            OnNext(800, 4),
            OnNext(900, 5),
            OnNext(1500, 6),
            OnNext(1600, 7),
            OnNext(1700, 8),
            OnNext(1800, 9),
            OnNext(1900, 0),
            OnNext(2000, 1),
            OnNext(2100, 2),
            OnNext(2200, 3),
            OnNext(2300, 4)
            );

        test.SampleFirst(TimeSpan.FromTicks(499), scheduler)
            .Timestamp(scheduler)
            .Subscribe(x =>  Console.WriteLine(
                "Time: {0} Value: {1}", x.Timestamp.Ticks, x.Value));

        scheduler.Start();

    }
}

Note that output is as per your scenario:

Time: 200 Value: 1
Time: 800 Value: 4
Time: 1500 Value: 6
Time: 2000 Value: 1
James World
  • 29,019
  • 9
  • 86
  • 120
  • This is great! Definitely does what was required. A question, would it work if .Window() received a TimeSpan instead of a Delay? – pauloya Mar 05 '15 at 23:56
  • It does work if you do that, but you get continuously created windows. It doesn't affect the output, but it does affect the performance profile. You can see the impact if you try the above test with the code modified to use a plain `TimeSpan`. The test will never complete unless you add an `OnCompleted` to the source. That's why I used this mechanism - the stream isn't "busy" when the source is quiet. – James World Mar 06 '15 at 00:44
0

This should do the trick. There may be a shorter implementation.

The accumulate in the Scan stores the Timestamp of the last kept Item and marks whether to Keep each item.

public static IObservable<T> RateLimit<T>(this IObservable<T> source, TimeSpan duration)
{
    return observable
        .Timestamp()
        .Scan(
            new
            {
                Item = default(T),
                Timestamp = DateTimeOffset.MinValue,
                Keep = false
            },
            (a, x) =>
            {
                var keep = a.Timestamp + duration <= x.Timestamp;
                return new
                {
                    Item = x.Value,
                    Timestamp = keep ? x.Timestamp : a.Timestamp,
                    Keep = keep
                };
            }
        })
        .Where(a => a.Keep)
        .Select(a => a.Item);
}
Timothy Shields
  • 75,459
  • 18
  • 120
  • 173