4

I'm using Rx.Net and I have Observable that emits time series points (double, timestamp). Every time new point arrives I want to calculate average value from lets say last 30 seconds. I think I need some kind of overlapping Window/Buffer not based on count but timestamp.

I've found this topic with SlidingWindow implementation, but I cannot figure out how to fit this to my problem.

EDIT:

Thanks to this I learned that I can use Scan operator and buffer my points, so basicly this solves the problem. But maybe there is better way to do this?

Community
  • 1
  • 1
voodoo11
  • 43
  • 5

1 Answers1

3

Buffer and Window look forward, you want something that looks back. Scan is the best starting point:

public static IObservable<List<T>> BackBuffer<T>(this IObservable<T> source, TimeSpan ts)
{
    return BackBuffer(source, ts, Scheduler.Default);
}
public static IObservable<List<T>> BackBuffer<T>(this IObservable<T> source, TimeSpan ts, IScheduler scheduler)
{
    return source
        .Timestamp()
        .Scan(new List<Timestamped<T>>(), (list, element) => list
            .Where(ti => scheduler.Now - ti.Timestamp <= ts)
            .Concat(Enumerable.Repeat(element, 1))
            .ToList()
        )
        .Select(list => list.Select(t => t.Value).ToList());
}

Once you have BackBuffer, or something like it, then the rest becomes easy:

source
    .BackBuffer(TimeSpan.FromMilliseconds(70))
    .Select(list => list.Average())
    .Subscribe(average => Console.WriteLine(average));
Shlomo
  • 14,102
  • 3
  • 28
  • 43