I'm looking for an observable extension method to do an inverse throttle. What I mean by this is to let the first item pass and then ignore items that follow the items within a due time.
input - due time 2
|*.*.*..*..|
output
|*......*..|
Note that this is a different question than the questions below (which are all the same). The questions below require a fixed suppression duration while I require a suppression duration that is increased every time a new item arrives too early. Visually the output of the solutions listed below is as follows:
input - due time 2
|*.*.*..*..|
output
|*...*..*..|
- How to take first occurrence and then supress events for 2 seconds (RxJS)
- How to throttle event stream using RX?
- Rx: How can I respond immediately, and throttle subsequent requests
UPDATE
I came up with the following solution, however I do not know enough about schedulers and concurrency to be sure the locking is good enough. I also don't know how to implement this method when a Scheduler
argument is added to the method.
public static IObservable<T> InverseThrottle<T>(this IObservable<T> source, TimeSpan dueTime)
{
IDisposable coolDownSupscription = null;
object subscriptionLock = new object();
return source
.Where(i =>
{
lock (subscriptionLock)
{
bool result;
if (coolDownSupscription == null)
{
result = true;
}
else
{
coolDownSupscription.Dispose();
result = false;
}
coolDownSupscription = Observable
.Interval(dueTime)
.Take(1)
.Subscribe(_ =>
{
lock (subscriptionLock)
{
coolDownSupscription = null;
}
});
return result;
}
});
}