Using Reactive Extensions, I have created a rolling buffer of values that caches a small history of recent values in a data stream for use in a plotting application. Since the values arrive much faster than I am interested in displaying, I would like to use the Sample(Timespan span) method in my Reactive pipeline to slow things down. However, adding it to the sample below causes an Exception to be thrown after a bit in the WriteEnumerable method (collection was modified). This is obviously a threading issue related to Sample, but I'm stumped on how exactly to alleviate it. I've tried setting the Scheduler to use in the Sample method to no avail.
Any advice?
class Program
{
static void Main(string[] args)
{
Observable.Interval(TimeSpan.FromSeconds(0.1))
.Take(500)
.TimedRollingBuffer(TimeSpan.FromSeconds(10))
.Sample(TimeSpan.FromSeconds(0.5))
.Subscribe(frame => WriteEnumerable(frame));
var input = "";
while (input != "exit")
{
input = Console.ReadLine();
}
}
private static void WriteEnumerable<T>(IEnumerable<T> enumerable)
{
foreach (T thing in enumerable)
Console.WriteLine(thing + " " + DateTime.UtcNow);
Console.WriteLine(Environment.NewLine);
}
}
public static class Extensions
{
public static IObservable<IEnumerable<Timestamped<T>>> TimedRollingBuffer<T>(this IObservable<T> observable, TimeSpan timeRange)
{
return Observable.Create<IEnumerable<Timestamped<T>>>(
o =>
{
var queue = new Queue<Timestamped<T>>();
return observable.Timestamp().Subscribe(
tx =>
{
queue.Enqueue(tx);
DateTime now = DateTime.Now;
while (queue.Peek().Timestamp < now.Subtract(timeRange))
queue.Dequeue();
o.OnNext(queue);
},
ex => o.OnError(ex),
() => o.OnCompleted()
);
});
}
}
credit where credit is due: reactive extensions sliding time window