1

Using Reactive Extensions, I have created a rolling buffer of values that caches a small history of recent values in a data stream for use in a plotting application. Since the values arrive much faster than I am interested in displaying, I would like to use the Sample(Timespan span) method in my Reactive pipeline to slow things down. However, adding it to the sample below causes an Exception to be thrown after a bit in the WriteEnumerable method (collection was modified). This is obviously a threading issue related to Sample, but I'm stumped on how exactly to alleviate it. I've tried setting the Scheduler to use in the Sample method to no avail.

Any advice?

class Program
{
    static void Main(string[] args)
    {
        Observable.Interval(TimeSpan.FromSeconds(0.1))
            .Take(500)
            .TimedRollingBuffer(TimeSpan.FromSeconds(10))
            .Sample(TimeSpan.FromSeconds(0.5))
            .Subscribe(frame => WriteEnumerable(frame));

        var input = "";
        while (input != "exit")
        {
            input = Console.ReadLine();
        }
    }

    private static void WriteEnumerable<T>(IEnumerable<T> enumerable)
    {
        foreach (T thing in enumerable)
            Console.WriteLine(thing + " " + DateTime.UtcNow);

        Console.WriteLine(Environment.NewLine);
    }
}

public static class Extensions
{
    public static IObservable<IEnumerable<Timestamped<T>>> TimedRollingBuffer<T>(this IObservable<T> observable, TimeSpan timeRange)
    {
        return Observable.Create<IEnumerable<Timestamped<T>>>(
            o =>
            {
                var queue = new Queue<Timestamped<T>>();
                return observable.Timestamp().Subscribe(
                    tx =>
                    {
                        queue.Enqueue(tx);
                        DateTime now = DateTime.Now;
                        while (queue.Peek().Timestamp < now.Subtract(timeRange))
                            queue.Dequeue();

                        o.OnNext(queue);
                    },
                    ex => o.OnError(ex),
                    () => o.OnCompleted()
                    );
            });
    }
}

credit where credit is due: reactive extensions sliding time window

Community
  • 1
  • 1
Andrew
  • 1,482
  • 9
  • 16

1 Answers1

2

The exception is due to the fact you are modifying the queue whilst enumerating it.

The RollingBuffer implementation you cited by @Enigmativity does the right thing - you will notice in his implemention the OnNext is invoked with a ToArray() ensuring a copy of the list as it stands is dispatched to observers rather than the mutating original.

In your case, the introduction of Sample introduces concurrency (which is fine - that's what it is supposed to do) - however, you are passing the queue itself, which will mutate whilst enumeration is occurring due to this introduced concurrency. This is a bug.

In your TimedRollingBuffer, if you were to use o.OnNext(queue.ToArray()) instead of o.OnNext(queue) you wouldn't have this problem.

Community
  • 1
  • 1
James World
  • 29,019
  • 9
  • 86
  • 120