My question is an extension of Advanceable historical stream and live stream in Rx
I want to be able to monitor changes in my stream using various rolling time frames (hourly, daily, etc). If I have a gap in my historical data, e.g. on the hourly time frame I have data that go back up to 30' ago and past 120' ago, i.e. I'm missing data for the [-120' to -30'] interval, I want to base the current change on the 120' observation. The problem with the code I have so far is that the change in the above example would be based on a value of 0, since the -60 observation is missing.
I do not know how to complete this , or if there is a cleaner and better solution altogether.
void Main()
{
const double bufferDuration = 8;
var now = DateTimeOffset.Now;
// create a historical log for testing
var log = new List<ValueTime>
{
new ValueTime { Ts = now.AddMilliseconds(-5000).ToString(), Balance = 1L },
new ValueTime { Ts = now.AddMilliseconds(-4000).ToString(), Balance = 2L },
new ValueTime { Ts = now.AddMilliseconds(-3000).ToString(), Balance = 4L }
};
var scheduler = new HistoricalScheduler();
scheduler.AdvanceTo(DateTime.Parse(log[0].Ts).AddSeconds(-0));
// historical part of the stream
var replay = Observable.Generate(
log.ToObservable().GetEnumerator(),
events => events.MoveNext(),
events => events,
events => events.Current.Balance,
events => DateTime.Parse(events.Current.Ts),
scheduler);
// create the real time part of the stream using a timer
var realTime = Observable.Create<long>(observer =>
{
var timer = Observable.Interval(TimeSpan.FromSeconds(2)).Select(x => 10 + x);
var disposable = timer.Subscribe(x => observer.OnNext(x));
return disposable;
});
// combine the two streams
var combined = replay
.Concat(realTime)
.Publish()
.RefCount();
combined.Timestamp(scheduler).Dump("Combined stream");
// use the real time stream to set the time of the historical scheduler's
realTime.Subscribe(_ =>
{
scheduler.AdvanceTo(DateTime.Now);
},
ex => Console.WriteLine(ex.Message),
() => Console.WriteLine("Done"));
// use a rolling buffer of "bufferDuration" length
var combinedBuffer = combined
.RollingBuffer(TimeSpan.FromSeconds(bufferDuration), scheduler)
.Select(x => x.Sum());
combinedBuffer.Timestamp(scheduler).Dump($"{bufferDuration}' Rolling buffer aggregation");
// display the values that are included in each window of the rolling buffer
combined
.RollingBuffer(TimeSpan.FromSeconds(bufferDuration), scheduler)
.Select(x => string.Join(",", x))
.Timestamp(scheduler).Dump($"{bufferDuration}' Rolling buffer lists");
// observe the difference between two consecutive observations
combinedBuffer.CombineWithPrevious((x, y) => y - x).Timestamp(scheduler).Dump($"{bufferDuration}' Rolling differences");
scheduler.Start();
}
class ValueTime
{
public long Balance;
public string Ts;
}
static class Extensions
{
public static IObservable<T[]> RollingBuffer<T>(
this IObservable<T> @this,
TimeSpan buffering, IScheduler scheduler)
{
return Observable.Create<
T[]>(o =>
{
var list = new LinkedList<Timestamped<T>>();
return @this.Timestamp(scheduler).Subscribe(tx =>
{
list.AddLast(tx);
while (list.First.Value.Timestamp < DateTime.Now.Subtract(buffering))
{
list.RemoveFirst();
}
o.OnNext(list.Select(tx2 => tx2.Value).ToArray());
}, o.OnError, o.OnCompleted);
});
}
public static IObservable<TResult> CombineWithPrevious<TSource, TResult>(this IObservable<TSource> source,
Func<TSource, TSource, TResult> resultSelector)
{
return source.Scan(
Tuple.Create(default(TSource), default(TSource)),
(previous, current) => Tuple.Create(previous.Item2, current))
.Select(t => resultSelector(t.Item1, t.Item2));
}
}
Running it in LINQPad produces the following results:
In the "8' Rolling differences" table, the 1:13:30 AM entry should have been 9 instead of 13, as I would have liked the value of 4 which was produced on 1:13:19 AM to be included in calculating the difference.
It seems that the current version of the rolling buffer, or the approach I’m taking for that matter, is not sufficient and it’s maybe distracting as to what I’m trying to achieve. I cannot use a count based buffer as this could get me well into the past if I have holes in my observations E.g. if I were to use a count of 2 in the following example, the 10:00:00 rolling difference would use x2 – x0, which would not be the result I’m looking for.
The behavior I’m looking for is to get the rolling difference of the current value and the one which is >= 8’’ ago (8’’ ago or immediately before 8’’ ). E.g.
Ts: 9:32:00 9:36:00 10:00:00 10:00:04 10:00:08 10:00:12 10:00:16
Level: --x0------x1-------x2-------x3-------x4-------x5-------x6---
Roll D.--x0----(x1-x0)-(x2-x1)--(x3-x1)--(x4-x2)--(x5-x3)--(x6-x4)-