0

My question is an extension of Advanceable historical stream and live stream in Rx

I want to be able to monitor changes in my stream using various rolling time frames (hourly, daily, etc). If I have a gap in my historical data, e.g. on the hourly time frame I have data that go back up to 30' ago and past 120' ago, i.e. I'm missing data for the [-120' to -30'] interval, I want to base the current change on the 120' observation. The problem with the code I have so far is that the change in the above example would be based on a value of 0, since the -60 observation is missing.

I do not know how to complete this , or if there is a cleaner and better solution altogether.

void Main()
{   
    const double bufferDuration = 8;    
    var now = DateTimeOffset.Now;

    // create a historical log for testing
    var log = new List<ValueTime>
            {
                new ValueTime { Ts = now.AddMilliseconds(-5000).ToString(), Balance = 1L },
                new ValueTime { Ts = now.AddMilliseconds(-4000).ToString(), Balance = 2L },
                new ValueTime { Ts = now.AddMilliseconds(-3000).ToString(), Balance = 4L }
            };

    var scheduler = new HistoricalScheduler();

    scheduler.AdvanceTo(DateTime.Parse(log[0].Ts).AddSeconds(-0));

    // historical part of the stream
    var replay = Observable.Generate(
            log.ToObservable().GetEnumerator(),
            events => events.MoveNext(),
            events => events,
            events => events.Current.Balance,
            events => DateTime.Parse(events.Current.Ts),
            scheduler);

    // create the real time part of the stream using a timer
    var realTime = Observable.Create<long>(observer =>
        {
            var timer = Observable.Interval(TimeSpan.FromSeconds(2)).Select(x => 10 + x);
            var disposable = timer.Subscribe(x => observer.OnNext(x));
            return disposable;
        });

    // combine the two streams
    var combined = replay
            .Concat(realTime)
            .Publish()
            .RefCount();

    combined.Timestamp(scheduler).Dump("Combined stream");

    // use the real time stream to set the time of the historical scheduler's
    realTime.Subscribe(_ =>
        {
            scheduler.AdvanceTo(DateTime.Now);
        },
        ex => Console.WriteLine(ex.Message),
        () => Console.WriteLine("Done"));

    // use a rolling buffer of "bufferDuration" length 
    var combinedBuffer = combined
            .RollingBuffer(TimeSpan.FromSeconds(bufferDuration), scheduler)
            .Select(x => x.Sum());

    combinedBuffer.Timestamp(scheduler).Dump($"{bufferDuration}' Rolling buffer aggregation");

    // display the values that are included in each window of the rolling buffer
    combined
    .RollingBuffer(TimeSpan.FromSeconds(bufferDuration), scheduler)
    .Select(x => string.Join(",", x))
    .Timestamp(scheduler).Dump($"{bufferDuration}' Rolling buffer lists");

    // observe the difference between two consecutive observations
    combinedBuffer.CombineWithPrevious((x, y) => y - x).Timestamp(scheduler).Dump($"{bufferDuration}' Rolling differences");

    scheduler.Start();
}

class ValueTime
{
    public long Balance;
    public string Ts;
}

static class Extensions
{
    public static IObservable<T[]> RollingBuffer<T>(
        this IObservable<T> @this,
        TimeSpan buffering, IScheduler scheduler)
    {
        return Observable.Create<
        T[]>(o =>
        {
            var list = new LinkedList<Timestamped<T>>();
            return @this.Timestamp(scheduler).Subscribe(tx =>
            {
                list.AddLast(tx);
                while (list.First.Value.Timestamp < DateTime.Now.Subtract(buffering))
                {
                    list.RemoveFirst();
                }
                o.OnNext(list.Select(tx2 => tx2.Value).ToArray());

            }, o.OnError, o.OnCompleted);
        });
    }

    public static IObservable<TResult> CombineWithPrevious<TSource, TResult>(this IObservable<TSource> source,
                                                                                Func<TSource, TSource, TResult> resultSelector)
    {
        return source.Scan(
            Tuple.Create(default(TSource), default(TSource)),
            (previous, current) => Tuple.Create(previous.Item2, current))
            .Select(t => resultSelector(t.Item1, t.Item2));
    }
}

Running it in LINQPad produces the following results:

enter image description here

In the "8' Rolling differences" table, the 1:13:30 AM entry should have been 9 instead of 13, as I would have liked the value of 4 which was produced on 1:13:19 AM to be included in calculating the difference.

It seems that the current version of the rolling buffer, or the approach I’m taking for that matter, is not sufficient and it’s maybe distracting as to what I’m trying to achieve. I cannot use a count based buffer as this could get me well into the past if I have holes in my observations E.g. if I were to use a count of 2 in the following example, the 10:00:00 rolling difference would use x2 – x0, which would not be the result I’m looking for.

The behavior I’m looking for is to get the rolling difference of the current value and the one which is >= 8’’ ago (8’’ ago or immediately before 8’’ ). E.g.

Ts:    9:32:00 9:36:00 10:00:00 10:00:04 10:00:08 10:00:12 10:00:16
Level: --x0------x1-------x2-------x3-------x4-------x5-------x6--- 
Roll D.--x0----(x1-x0)-(x2-x1)--(x3-x1)--(x4-x2)--(x5-x3)--(x6-x4)-
Community
  • 1
  • 1
Dimitri
  • 23
  • 4
  • I am not sure I understand what the problem is. Your buffer duration is set to 8 seconds, but you want the buffer to include values from 11 seconds ago? – Brandon Kramer Mar 13 '17 at 14:46
  • The loss of the value '4' is already accounted for at the 1:13:28 difference, accounting for it at 1:13:30 as well would be incorrect, and inconsistent with the previous data points. – Brandon Kramer Mar 13 '17 at 14:52
  • The above code represents an effort to solve the problem of finding the rolling 8’ difference in the stream’s observations. Say that I’m trying to monitor the level of a given entity, and I use the stream “Combined” to do that. The difference of 13 at 1:13:30 AM is wrong as the value of that entity was 4 at 1:13:19 AM, and there was no observation setting it to 0 between 1:13:19 AM and 1:13:24 AM. The 1:13:30 AM 8’ difference should be 13 – 4. Maybe I’ve gone the wrong way about it solving this. I’m just looking for help to solve this problem. – Dimitri Mar 13 '17 at 21:48
  • Well, the point that I was making, is that the rolling differences is consistent with the rolling buffer. Your `RollingBuffer` method is specifically excluding all values that occurred more than 8 seconds ago. My point was that at 1:13:28 the value '4' fell out of the buffer, so at 1:13:30 there is no longer any value of '4' for the rolling difference to calculate with. Therefore, if you wish to include '4' in the value of 1:13:30, you will either need to change the buffer duration, or modify the `RollingBuffer` to include additional values. – Brandon Kramer Mar 14 '17 at 12:06
  • Additionally, I am not certain what an 'observation setting it to zero' has to do with this, since there are no such observations for any element in your code. As far as I can see, this is just a buffered stream with a period of 8 seconds. If you are trying to skip back over some gap in the data, then a time based buffer will not work for this issue, since the amount of time that data may be missing for is not predictable. You may need to consider a count based buffer? – Brandon Kramer Mar 14 '17 at 12:15
  • What are the criteria for when a value may be dropped? When must a value be kept? If it is not solely based on time, then what else is it based on? – Brandon Kramer Mar 14 '17 at 12:19
  • Well the first step is to define the logic, trying to write code before you do is just a waste. Perhaps a time based buffer that will cache the last dropped value, and show it if the current buffer is lower than the expected count? – Brandon Kramer Mar 14 '17 at 17:11
  • I was just saying that if you write code before totally figuring out the logic, you are likely to have to rewrite it. I was just trying to share my own hard learned experience regarding to jumping straight to coding. :) – Brandon Kramer Mar 14 '17 at 17:40
  • This could work. I did not use counts as the observations even though they are scheduled on a timer, they can be irregular due to crashes, or bad readings. I've already edited my question to explain my ask in a better way. Thank you so much for trying to help me. – Dimitri Mar 14 '17 at 17:48
  • No worries. I'm really indebted to you for staying with me. – Dimitri Mar 14 '17 at 17:48
  • Always glad to help out! – Brandon Kramer Mar 14 '17 at 17:49
  • Is there a consistent interval that data is supposed to be received in? I assume that there must be in order to detect when data is missing. – Brandon Kramer Mar 14 '17 at 17:53
  • The data is supposed to come in every minute, but I want to check the hourly, daily and weekly time-frames. – Dimitri Mar 14 '17 at 19:42
  • For larger time frames, are you planning on aggregating the data? – Brandon Kramer Mar 14 '17 at 19:43
  • No aggregation. I just want the delta, i.e. I want to compare the current level vs what the level was an hour ago, in 1' from now I want to compare the new level vs what the level was 59' ago, in 2' from now I want to compare the that level vs what the level was 58' ago and so on. – Dimitri Mar 14 '17 at 20:02

1 Answers1

0

I would have thought that the problem was general enough, that a solution would have already been found, or at least a few people would have had an interest in it. Anyways, after struggling quite a bit of time with it, and this is mainly due to my poor understanding of Rx, I figured out that the steps to the desired behavior were to use the above code as the base and:

A. modify the rolling buffer extension method from:

list.AddLast(tx);
while (list.First.Value.Timestamp < DateTime.Now.Subtract(buffering))
{
    list.RemoveFirst();
}
o.OnNext(list.Select(tx2 => tx2.Value).ToArray());

to:

    list.AddLast(tx);

    var nowTime = scheduler.Now.DateTime;
    System.Reactive.Timestamped<T> el = default(System.Reactive.Timestamped<T>);
    while (list.Count > 1 && list.First.Value.Timestamp < nowTime.Subtract(buffering))
    {
        el = list.First.Value;
        list.RemoveFirst();
    }

    if (el != default(System.Reactive.Timestamped<T>) && (list.Count <= 1 || list.First.Value.Timestamp > nowTime.Subtract(buffering)))
    {
        list.AddFirst(el);
    }
    o.OnNext(list.Select(tx2 => tx2.Value).ToArray());

B. Instead of the aggregate Sum of the values list, use the difference between the last and first element in the list:

var combinedBuffer = combined
            .RollingBuffer(TimeSpan.FromSeconds(bufferDuration), scheduler)
            .Select(x => x.Last() - x.First());

C. Remove the "CombineWithPrevious" call altogether.

combinedBuffer.CombineWithPrevious((x, y) => y - x).Timestamp(scheduler).Dump($"{bufferDuration}' Rolling differences");

The complete call with some debugging printouts is the following:

 void Main()
{
    const double bufferDuration = 8;
    var now = DateTimeOffset.Now;

    Console.WriteLine(now);

    // create a historical log for testing
    var log = new List<ValueTime>
            {
                new ValueTime { Ts = now.AddMilliseconds(-5000).ToString(), Balance = 1L },
                new ValueTime { Ts = now.AddMilliseconds(-4000).ToString(), Balance = 2L },
                new ValueTime { Ts = now.AddMilliseconds(-3000).ToString(), Balance = 4L }
            };

    var scheduler = new HistoricalScheduler();

    scheduler.AdvanceTo(DateTime.Parse(log[0].Ts));

    // historical part of the stream
    var replay = Observable.Generate(
            log.ToObservable().GetEnumerator(),
            events => events.MoveNext(),
            events => events,
            events => events.Current.Balance,
            events => DateTime.Parse(events.Current.Ts),
            scheduler);

    // create the real time part of the stream using a timer
    var realTime = Observable.Create<long>(observer =>
        {
            var timer = Observable.Interval(TimeSpan.FromSeconds(2)).Select(x => 10 + x);
            var disposable = timer.Subscribe(x => observer.OnNext(x));
            return disposable;
        });

    // combine the two streams
    var combined = replay
            .Concat(realTime)
            .Publish()
            .RefCount();

    combined.Timestamp(scheduler).Dump("Combined stream");

    // use the real time stream to set the time of the historical scheduler's
    realTime.Subscribe(_ =>
        {
            scheduler.AdvanceTo(DateTime.Now);
        },
        ex => Console.WriteLine(ex.Message),
        () => Console.WriteLine("Done"));

    var combinedRollingBuffer = combined
            .RollingBufferDeltaChange(TimeSpan.FromSeconds(bufferDuration), scheduler)
            .Publish()
            .RefCount();

    // use a rolling buffer of "bufferDuration" length 
    var combinedBuffer = combinedRollingBuffer
            //.Select(x => x.Sum());
            .Select(x => x.Last() - x.First());

    combinedBuffer.Timestamp(scheduler).Dump($"{bufferDuration}'' Rolling buffer aggregation");

    // display the values that are included in each window of the rolling buffer
    combinedRollingBuffer
    .Select(x => string.Join(",", x))
    .Timestamp(scheduler).Dump($"{bufferDuration}'' Rolling buffer lists");

    scheduler.Start();
}

class ValueTime
{
    public long Balance;
    public string Ts;
}

static class Extensions
{
    public static IObservable<T[]> RollingBufferDeltaChange<T>(
        this IObservable<T> @this,
        TimeSpan buffering, IScheduler scheduler)
    {
        return Observable.Create<
        T[]>(o =>
        {
            var list = new LinkedList<Timestamped<T>>();
            return @this.Timestamp(scheduler).Subscribe(tx =>
            {
                list.AddLast(tx);
                Console.WriteLine($"{scheduler.Now} Adding Tx: {tx.Timestamp}  {tx.Value} list.First: {list.First.Value.Timestamp} {list.First.Value.Value} list.Last: {list.Last.Value.Timestamp} {list.Last.Value.Value}");

                DateTime nowTime = scheduler.Now.DateTime; // DateTime.Now;

                System.Reactive.Timestamped<T> el = default(System.Reactive.Timestamped<T>);
                while (list.Count > 1 && list.First.Value.Timestamp < nowTime.Subtract(buffering))
                {
                    el = list.First.Value;
                    list.RemoveFirst();

                    Console.WriteLine($"{scheduler.Now} Removing el: {el.Timestamp}  {el.Value} {list.Count}");
                }

                if (el != default(System.Reactive.Timestamped<T>) && (list.Count <= 1 || list.First.Value.Timestamp > nowTime.Subtract(buffering)))
                {
                    list.AddFirst(el);
                    Console.WriteLine($"{scheduler.Now} Adding el: {el.Timestamp}  {el.Value} {list.Count}");
                    if (list.Count > 0)
                    {
                        Console.WriteLine($"{scheduler.Now} el: {el.Timestamp}  {el.Value} list.First: {list.First.Value.Timestamp} {list.First.Value.Value} list.Last: {list.Last.Value.Timestamp} {list.Last.Value.Value}");
                    }
                }

                o.OnNext(list.Select(tx2 => tx2.Value).ToArray());

            }, o.OnError, o.OnCompleted);
        });
    }
}

and it produces exactly the behavior I was looking for:

enter image description here

It would be a great omission if besides my original reference to James World's answer in Advanceable historical stream and live stream in Rx, I did not also acknowledge Enigmativity and his answer in reactive extensions sliding time window. Without their help not only in these specific posts, but in numerous others, I would not stand a chance in using Rx in my code, so a big thank you to both of them.

Community
  • 1
  • 1
Dimitri
  • 23
  • 4