I made a extension method:
public static IObservable<T> RateLimit<T>(this IObservable<T> source,
TimeSpan minDelay)
{
return
source.TimeInterval()
.Select(
(x, i) =>
Observable.Return(x.Value)
.Delay(i == 0
? TimeSpan.Zero
: TimeSpan.FromTicks(
Math.Max(minDelay.Ticks - x.Interval.Ticks, 0))))
.Concat();
}
this creates a new observable that only lets items through with a minimum separation in time.
To remove initial latency, it is necessary to treat the first item differently.
As can be seen, there is a test to see if we are dealing with the first item by testing i == 0
. The problem here is that if we process more than int.MaxValue
items, this will fail.
Instead, I thought about the following sequence
var trueThenFalse = Observable.Return(true)
.Concat(Observable.Repeat(false))
and zipping it up alongside my source:
source.TimeInterval().Zip(trueThenFalse, ...
but when passing this infinite sequence to Zip, we appear to enter a tight loop where trueThenFalse
emits all items in one go (to infinity). Fail.
I could easily code around this with side-effects (a bool
in the outer scope, for instance), but this would represent a loss of purity that I would not be happy with.
Any suggestions?
EDIT
Although not quite the same behaviour, the following code exhibits some nasty traits
var trueThenFalse = Observable.Return(true)
.Concat(Observable.Repeat(false));
var src = Observable.Interval(TimeSpan.FromSeconds(1)); //never completes
src.Zip(trueThenFalse,(i,tf)=>tf).ForEach(x=>Trace.TraceInformation("{0}",x));
and eventually dies with an OOME. This is because trueThenFalse
appears to be unspooling all of its values, but they aren't consumed by the Zip in a timely fashion.