5

I have a sequence of stock ticks coming in and I want to take all the data in the last hour and do some processing on it. I am trying to achieve this with reactive extensions 2.0. I read on another post to use Interval but i think that is deprecated.

NeddySpaghetti
  • 13,187
  • 5
  • 32
  • 61
  • 3
    Do you want to have the last hour of values each and every time a new value comes in or do you just want an hour's worth of stock ticks each hour? – Enigmativity Jul 20 '12 at 01:32
  • I want the last hour of values each and every time a new value comes in. I've looked into Buffer but i don't think it is the right one. – NeddySpaghetti Jul 25 '12 at 08:02

5 Answers5

9

Would this extension method solve your problem?

public static IObservable<T[]> RollingBuffer<T>(
    this IObservable<T> @this,
    TimeSpan buffering)
{
    return Observable.Create<T[]>(o =>
    {
        var list = new LinkedList<Timestamped<T>>();
        return @this.Timestamp().Subscribe(tx =>
        {
            list.AddLast(tx);
            while (list.First.Value.Timestamp < DateTime.Now.Subtract(buffering))
            {
                list.RemoveFirst();
            }
            o.OnNext(list.Select(tx2 => tx2.Value).ToArray());
        }, ex => o.OnError(ex), () => o.OnCompleted());
    });
}
Enigmativity
  • 113,464
  • 11
  • 89
  • 172
4

You are looking for the Window operators! Here is a lengthy article I wrote on working with sequences of coincidence (overlapping windows of sequences) http://introtorx.com/Content/v1.0.10621.0/17_SequencesOfCoincidence.html

So if you wanted to build a rolling average you could use this sort of code

var scheduler = new TestScheduler();
var notifications = new Recorded<Notification<double>>[30];
for (int i = 0; i < notifications.Length; i++)
{
  notifications[i] = new Recorded<Notification<double>>(i*1000000, Notification.CreateOnNext<double>(i));
}
//Push values into an observable sequence 0.1 seconds apart with values from 0 to 30
var source = scheduler.CreateHotObservable(notifications);

source.GroupJoin(
      source,   //Take values from myself
      _=>Observable.Return(0, scheduler), //Just the first value
      _=>Observable.Timer(TimeSpan.FromSeconds(1), scheduler),//Window period, change to 1hour
      (lhs, rhs)=>rhs.Sum())    //Aggregation you want to do.
    .Subscribe(i=>Console.WriteLine (i));
scheduler.Start();

And we can see it output the rolling sums as it receives values.

0, 1, 3, 6, 10, 15, 21, 28...

Lee Campbell
  • 10,631
  • 1
  • 34
  • 29
  • I can't get this code to work. ```rhs.Sum()``` produces ```IObservable``` not ```double```, so I'm assuming ```.Concat()``` is needed as well? But even then only zeros are outputted – Matt Thomas Sep 07 '17 at 13:51
  • Apologies, I have corrected the answer. When using the `TestScheduler` to be accurate, the Observable.Return(0) needs the scheduler passed in too i.e. Observable.Return(0, scheduler). Perhaps a more meaningful value would be Observable.Return(Unit.Default, scheduler)` or `Observable.Empty(scheduler)` – Lee Campbell Sep 07 '17 at 23:08
  • @LeeCampbell I am having trouble using your solution for my, a bit different, needs. I need to get two values (min and max), not only one. I tried two approaches - first output a Tuple of `(rhs.MinBy(...), rhs.MaxBy(...))` at the aggreagation step and then use it in `.Subscribe(diff => Math.Avg(diff.Item1, diff.Item2)` (just an example). This does not work as the compiler cannot convert the diff arguments to double/decimal. Then I tried just outputting `(_, rhs) => rhs.Subscribe(...)` and calc the min/max in there, but did not work, neither did using .Scan. Do you see what can be done please? – pun11 Jun 10 '19 at 08:35
  • @LeeCampbell Maybe better put, I do not know how to get the value from the stream to perform calculations with - ie "convert" from `System.IObservable>>` to decimal/double/any other object I am getting from the stream. I do not want to resort to using .Wait(), .Last() or other blocking constructs. Your input is highly appreciated, I love your introtorx site btw. – pun11 Jun 10 '19 at 08:44
  • @pun11 create a new post with your unit test that you are struggling to get to pass. cc me in the post, but I bet the community will answer quicker than I will. – Lee Campbell Oct 04 '19 at 14:00
1

Very likely Buffer is what you are looking for:

var hourlyBatch = ticks.Buffer(TimeSpan.FromHours(1));
Anderson Imes
  • 25,500
  • 4
  • 67
  • 82
1

Or assuming data is already Timestamped, simply using Scan:

    public static IObservable<IReadOnlyList<Timestamped<T>>> SlidingWindow<T>(this IObservable<Timestamped<T>> self, TimeSpan length)
    {
        return self.Scan(new LinkedList<Timestamped<T>>(),
                         (ll, newSample) =>
                         {
                             ll.AddLast(newSample);
                             var oldest = newSample.Timestamp - length;
                             while (ll.Count > 0 && list.First.Value.Timestamp < oldest)
                                 list.RemoveFirst();

                             return list;
                         }).Select(l => l.ToList().AsReadOnly());
    }
kwesolowski
  • 695
  • 8
  • 18
0

https://github.com/Froussios/New-Intro-To-Rx/blob/master/Part%203%20-%20Taming%20the%20sequence/5.%20Time-shifted%20sequences.md#overlapping-buffers-by-time

Console.WriteLine($"{DateTime.Now:T}: Start");

Observable
    .Interval(TimeSpan.FromSeconds(1)).Take(5)
    .Buffer(TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(1))
    .Subscribe(x => { Console.WriteLine($"{DateTime.Now:T}: {string.Join(", ", x.ToArray())}"); });

await Task.Delay(TimeSpan.FromSeconds(10));

Console.WriteLine($"{DateTime.Now:T}: End");

Output:

    17:07:27: Start
    17:07:29: 0, 1
    17:07:30: 0, 1, 2
    17:07:31: 2, 3
    17:07:32: 3, 4
    17:07:32: 4
    17:07:37: End
flibustier
  • 26
  • 3