2

I have an observable which streams a value for each ms. , this is done every 250 ms. ( meaning 250 values in 250 ms (give or take) ).

Mock sample code :

     IObservable<IEnumerable<int>> input = from _ in Observable.Interval(TimeSpan.FromMilliseconds(250))
                    select CreateSamples(250);

      input.Subscribe(values =>
        {
            for (int i = 0; i < values.Count(); i++)
            {
                Console.WriteLine("Value : {0}", i);
            }
        });

        Console.ReadKey(); 


    private static IEnumerable<int> CreateSamples(int count)
    {
        for (int i = 0; i < 250; i++)
        {
            yield return i;
        }
    }

What i need is to create some form of process observable which process the input observable in a rate of 8 values every 33 ms

Something along the line of this :

 IObservable<IEnumerable<int>> process = from _ in Observable.Interval(TimeSpan.FromMilliseconds(33)) 
                     select stream.Take(8);

I was wondering 2 things :

1) How can i write the first sample with the built in operators that reactive extensions provides ?

2) How can i create that process stream which takes values from the input stream which with the behavior iv'e described ?

I tried using Window as a suggestion from comments below .

 input.Window(TimeSpan.FromMilliseconds(33)).Take(8).Subscribe(winObservable => Debug.WriteLine(" !! "));

It seems as though i get 8 and only 8 observables of an unknown number of values

What i require is a recurrence of 8 values every 33 ms. from input observable.

What the code above did is 8 observables of IEnumrable and then stand idle.

EDIT : Thanks to James World . here's a sample .

  var input = Observable.Range(1, int.MaxValue);

  var timedInput = Observable.Interval(TimeSpan.FromMilliseconds(33))
        .Zip(input.Buffer(8), (_, buffer) => buffer);

  timedInput.SelectMany(x => x).Subscribe(Console.WriteLine);

But now it get's trickier i need for the Buffer value to calculated i need this to be done by the actual MS passed between Intervals
when you write a TimeSpan.FromMilliseconds(33) the Interval event of the timer would actually be raised around 45 ms give or take .

Is there any way to calculate the buffer , something like PSUDO

  input.TimeInterval().Buffer( s => s.Interval.Milliseconds / 4)
abatishchev
  • 98,240
  • 88
  • 296
  • 433
eran otzap
  • 12,293
  • 20
  • 84
  • 139
  • It's just a [Window](http://www.introtorx.com/content/v1.0.10621.0/17_SequencesOfCoincidence.html#Window) over 33ms followed by `.Take(8)` – zerkms Mar 08 '15 at 22:05
  • this gives me an IObservable>> and not the IObservable> i expected . how would you extract the values for the observer ? – eran otzap Mar 08 '15 at 22:14
  • It is mentioned in the article: http://www.introtorx.com/content/v1.0.10621.0/17_SequencesOfCoincidence.html#FlatteningAWindowOperation + http://www.introtorx.com/content/v1.0.10621.0/07_Aggregation.html#NestedObservables + http://www.introtorx.com/content/v1.0.10621.0/08_Transformation.html#SelectMany – zerkms Mar 08 '15 at 22:18
  • @zerkms i'll try it out tommrow , i'll let you know . did you see the edit to my question ? it does seem as though it happens 8 times , but i don't wan't 8 Observables . i wan't 8 values from input observable every 33 ms . what i got is 8 and only 8 obsrevable of god knows how many values and then stood idle.. – eran otzap Mar 08 '15 at 22:26
  • 1. Window function chunks the input stream by 33ms 2. Take(8) takes 8 elements out of every chunk. You just need to combine these 2 operators properly. For example - `.Window(TimeSpan.FromMilliseconds(33)).SelectMany(i => i.Take(8))` PS: I don't develop in C# and don't use Rx.NET so this may be not exactly correct, but just follow the idea. – zerkms Mar 08 '15 at 22:28
  • o'k i'll try it again tommrow. 10x mean while . – eran otzap Mar 08 '15 at 22:31
  • What do you want to happen if the source produces more than 8 values in the 33ms timespan? What do you want to happen if fewer than 8 are produced? – Foole Mar 08 '15 at 22:31
  • there are certainly more element's at a given point i would expect there to be 250 elements which i wan't to break up into smaller chunks at a specific rate . – eran otzap Mar 08 '15 at 22:34
  • O'k , the behavior i'm trying to achieve is a queue which enqueues 250 samples every 250 ms. and dequeues 33 samples every 33 ms. ( not 8 ) – eran otzap Mar 08 '15 at 22:35

2 Answers2

3

You won't be able to do this with any kind of accuracy with a reasonable solution because .NET timer resolution is 15ms.

If the timer was fast enough, you would have to flatten and repackage the stream with a pacer, something like:

// flatten stream
var fs = input.SelectMany(x => x);

// buffer 8 values and release every 33 milliseconds
var xs = Observable.Interval(TimeSpan.FromMilliseconds(33))
                   .Zip(fs.Buffer(8), (_,buffer) => buffer);

Although as I said, this will give very jittery timing. If that kind of timing resolution is important to you, go native!

James World
  • 29,019
  • 9
  • 86
  • 120
  • i know , that's gonna be the second part . according to TimeInterval i will decide how many values to take. or i might use a timer based on Win32 API. that i found on code project some time ago .. – eran otzap Mar 09 '15 at 06:04
  • ...when I said go native, I meant really go native. Like unmanaged C++. You'll have a bunch of things to contend with in managed code - jitting, GC etc. Hard to comment without knowing your scenario, but .NET is generally not the tool for code with high-precision timing requirements. – James World Mar 09 '15 at 08:31
  • do you know of any examples where i can call the system timer from unmanaged code in a .Net application ? – eran otzap Mar 09 '15 at 09:03
  • I'm using something like this : http://www.codeproject.com/Articles/5501/The-Multimedia-Timer-for-the-NET-Framework which wraps win32 api , it performs fairly well. – eran otzap Mar 09 '15 at 09:05
  • As I am sure you are discovering, there isn't one "system timer" but a range of them. You can even get peripheral timers. But the fundamental problem isn't limited to the timer really. As soon as you are in a world with managed threads, your timing is going to be low resolution. You can write managed code that minimized GC and avoids callbacks and thread switches - but that's likely to lead to an unmaintainable nightmare. I don't know what you are trying to achieve, so I don't know how feasible your approach is. Just make sure you are using the right tool for the job - may well be not .NET. – James World Mar 09 '15 at 09:13
  • thanks for the advice , i need resolutions of ms. represented on a screen for a plotter. i have no problem with the short delay of the .net timer. i just know that my 33 is actually more like 50 . – eran otzap Mar 09 '15 at 09:19
  • The windows system timer (used by Win32 Native API as well as the .NET timer) runs at 60Hz, giving you a resolution of 16ms. If you need higher resolution than that, you'll need to use a "high resolution timer". This SO question presents some options for that: http://stackoverflow.com/questions/7137121/c-sharp-high-resolution-timer – Brandon Mar 09 '15 at 13:20
  • @JamesWorld 1ox for all your help iv'e made an edit to my answer , but it turns out i have a trickier scenario , could you have a look ? – eran otzap Mar 09 '15 at 14:29
  • @JamesWorld could you please have a look at something , 10x http://stackoverflow.com/questions/29755137/rx-zip-outputs-an-unexpected-result/29765124#29765124 – eran otzap Apr 26 '15 at 13:13
1

I agree with James' analysis.

I'm wondering if this query gives you a better result:

IObservable<IList<int>> input =
    Observable
        .Generate(
            0,
            x => true,
            x => x < 250 ? x + 1 : 0,
            x => x,
            x => TimeSpan.FromMilliseconds(33.0 / 8.0))
        .Buffer(TimeSpan.FromMilliseconds(33.0));
Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • would you mind having a look at another question : http://stackoverflow.com/questions/29755137/rx-zip-outputs-an-unexpected-result/29765124#29765124 – eran otzap Apr 26 '15 at 13:13