6

How can you do in RX a simple, stateful transform of a sequence?

Say we want to make an exponential moving average transform of a IObservable noisySequence.

Whenever noisySequence ticks, emaSequence should tick and return the value (previousEmaSequenceValue*(1-lambda) + latestNoisySequenceValue*lambda)

I guess we use Subjects, but how exactly?

    public static void Main()
    {

        var rand = new Random();

        IObservable<double> sequence  = Observable
            .Interval(TimeSpan.FromMilliseconds(1000))
            .Select(value => value + rand.NextDouble());

        Func<double, double> addNoise = x => x + 10*(rand.NextDouble() - 0.5);

        IObservable<double> noisySequence = sequence.Select(addNoise);

        Subject<double> exponentialMovingAverage = new Subject<double>(); // ??? 


        sequence.Subscribe(value => Console.WriteLine("original sequence "+value));
        noisySequence.Subscribe(value => Console.WriteLine("noisy sequence " + value));
        exponentialMovingAverage.Subscribe(value => Console.WriteLine("ema sequence " + value));

        Console.ReadLine();
    }
Sputnik2513
  • 111
  • 1
  • 5
  • 1
    To clarify, I'm less interested in a specific method to do an average, but rather generic ways to make stateful transforms. – Sputnik2513 Feb 11 '13 at 06:01

3 Answers3

7

This is how you can attach state to a sequence. In this case it calculates the average of the last 10 values.

var movingAvg = noisySequence.Scan(new List<double>(),
(buffer, value)=>
{
    buffer.Add(value);
    if(buffer.Count>MaxSize)
    {
        buffer.RemoveAt(0);
    }
    return buffer;
}).Select(buffer=>buffer.Average());

But you could use Window (which Buffer is sort of a generalisation of) to get your average too.

noisySequence.Window(10)
   .Select(window=>window.Average())
   .SelectMany(averageSequence=>averageSequence);
Lee Campbell
  • 10,631
  • 1
  • 34
  • 29
  • Tangential: your articles about `Window`, `Buffer`, `Join`, and `GroupJoin` were extremely illuminating - in fact, I believe I've 'quoted' you a number of times answering Rx-related questions. :) – JerKimball Feb 11 '13 at 16:15
4

For many of these types of calculations, Buffer is the easiest way

var movingAverage = noisySequence.Buffer(/*last*/ 3,
    /*move forward*/ 1 /*at a time*/)
    .Select(x => (x[0] + x[1] + x[2]) / 3.0);

If you need to carry state around, use the Scan operator, which is like Aggregate except that it yields values every iteration.

Edit: fixed comment syntax

Le Droid
  • 4,534
  • 3
  • 37
  • 32
Ana Betts
  • 73,868
  • 16
  • 141
  • 209
  • That works in this specific example of an average. More generally, if we need internal states, how best to do it in RX? – Sputnik2513 Feb 11 '13 at 05:58
  • Also that has no states. To implement an exponential moving average, you need to know the last value of the EMA. – Sputnik2513 Feb 11 '13 at 06:04
  • As Paul said, you can use Scan which allows you to aggregate on-the-fly by starting with a seed "state" and performing some action on that state for each OnNext. You can then select from the state the value you wish to publish out of the sequence. If however you need access to more than just the current aggregate and the newest value, then the Window operators maybe of use. – Lee Campbell Feb 11 '13 at 14:03
1

Thanks! Here is a solution using Scan

    const double lambda = 0.99;
    IObservable<double> emaSequence = noisySequence.Scan(Double.NaN, (emaValue, value) =>
        {
            if (Double.IsNaN(emaValue))
            {
                emaValue = value;
            }
            else
            {
                emaValue = emaValue*lambda + value*(1-lambda);
            }
            return emaValue;
        }).Select(emaValue => emaValue);
Sputnik2513
  • 111
  • 1
  • 5