Here is an idiomatic way to do this in Rx, as an extension method - an explanation and example using your scenario follows.
The desired function works a lot like Observable.Throttle
but emits qualifying events as soon as they arrive rather than delaying for the duration of the throttle or sample period. For a given duration after a qualifying event, subsequent events are suppressed:
public static IObservable<T> SampleFirst<T>(
this IObservable<T> source,
TimeSpan sampleDuration,
IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
return source.Publish(ps =>
ps.Window(() => ps.Delay(sampleDuration,scheduler))
.SelectMany(x => x.Take(1)));
}
The idea is to use the overload of Window that creates non-overlapping windows using a windowClosingSelector that uses the source time-shifted back by the sampleDuration. Each window will therefore: (a) be closed by the first element in it and (b) remain open until a new element is permitted. We then simply select the first element from each window.
In the following example, I have repeated exactly your test scenario modelling one "dash" as 100 ticks. Note the delay is specified as 499 ticks rather than 500 due to the resolution of passing events between multiple schedulers causing 1 tick drifts - in practice you wouldn't need to dwell on this as single tick resolutions is unlikely to be meaningful. The ReactiveTest
class and OnNext
helper methods are made available by including the Rx testing framework nuget package rx-testing
:
public class Tests : ReactiveTest
{
public void Scenario()
{
var scheduler = new TestScheduler();
var test = scheduler.CreateHotObservable<int>(
// set up events as per the OP scenario
// using 1 dash = 100 ticks
OnNext(200, 1),
OnNext(400, 2),
OnNext(500, 3),
OnNext(800, 4),
OnNext(900, 5),
OnNext(1500, 6),
OnNext(1600, 7),
OnNext(1700, 8),
OnNext(1800, 9),
OnNext(1900, 0),
OnNext(2000, 1),
OnNext(2100, 2),
OnNext(2200, 3),
OnNext(2300, 4)
);
test.SampleFirst(TimeSpan.FromTicks(499), scheduler)
.Timestamp(scheduler)
.Subscribe(x => Console.WriteLine(
"Time: {0} Value: {1}", x.Timestamp.Ticks, x.Value));
scheduler.Start();
}
}
Note that output is as per your scenario:
Time: 200 Value: 1
Time: 800 Value: 4
Time: 1500 Value: 6
Time: 2000 Value: 1