I have an observable which streams a value for each ms. , this is done every 250 ms. ( meaning 250 values in 250 ms (give or take) ).
Mock sample code :
IObservable<IEnumerable<int>> input = from _ in Observable.Interval(TimeSpan.FromMilliseconds(250))
select CreateSamples(250);
input.Subscribe(values =>
{
for (int i = 0; i < values.Count(); i++)
{
Console.WriteLine("Value : {0}", i);
}
});
Console.ReadKey();
private static IEnumerable<int> CreateSamples(int count)
{
for (int i = 0; i < 250; i++)
{
yield return i;
}
}
What i need is to create some form of process observable which process the input observable in a rate of 8 values every 33 ms
Something along the line of this :
IObservable<IEnumerable<int>> process = from _ in Observable.Interval(TimeSpan.FromMilliseconds(33))
select stream.Take(8);
I was wondering 2 things :
1) How can i write the first sample with the built in operators that reactive extensions provides ?
2) How can i create that process stream which takes values from the input stream which with the behavior iv'e described ?
I tried using Window as a suggestion from comments below .
input.Window(TimeSpan.FromMilliseconds(33)).Take(8).Subscribe(winObservable => Debug.WriteLine(" !! "));
It seems as though i get 8 and only 8 observables of an unknown number of values
What i require is a recurrence of 8 values every 33 ms. from input observable.
What the code above did is 8 observables of IEnumrable and then stand idle.
EDIT : Thanks to James World . here's a sample .
var input = Observable.Range(1, int.MaxValue);
var timedInput = Observable.Interval(TimeSpan.FromMilliseconds(33))
.Zip(input.Buffer(8), (_, buffer) => buffer);
timedInput.SelectMany(x => x).Subscribe(Console.WriteLine);
But now it get's trickier i need for the Buffer value to calculated
i need this to be done by the actual MS passed between Intervals
when you write a TimeSpan.FromMilliseconds(33) the Interval event of the timer would actually be raised around 45 ms give or take .
Is there any way to calculate the buffer , something like PSUDO
input.TimeInterval().Buffer( s => s.Interval.Milliseconds / 4)