1

I have a remote program that sends an updated measurement every 10 milliseconds over a socket connection. In my client program I have wrapped this socket in an observable that generated these measurements. For my usecase it's important that the measurement arrive at 10 millisecond intervals. Of course this is not happening since network delays make it arrive a little earlier or later each message.

So basically what I have on my remote pc is program that sends this on a socket connection.

-- is 10 milliseconds

o--o--o--o--o--o--o--o--o--o--...

Which becomes this on my client due to network delays.

o-o---o-o--o---o--o-o--o-o-...

Now in my observable I want to "normalise" this so it will again emit a value each 10 millisecond.

--o--o--o--o--o--o--o--o--o--o...

Of course this will mean I will have to introduce a buffer time that it will store values and emit them on 10 millisecond interval. Is there a way I can accomplish this?

Here is some test code that will emit the event according to the way I described above.

using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading.Tasks;
using Microsoft.Reactive.Testing;

public class Program
{
    protected static event EventHandler<EventArgs> CancelEvent;

    private static Random random = new Random();

    private static double GetRandomNumber(double minimum, double maximum)
    { 
        return random.NextDouble() * (maximum - minimum) + minimum;
    }

    public static void Main()
    {
        var completed = false;

        var scheduler = new TestScheduler();

        var observable = Observable
            .Interval(TimeSpan.FromMilliseconds(7.0), scheduler)
            .SelectMany(e => Observable
                .Return(e, scheduler)
                .Delay(TimeSpan.FromMilliseconds(GetRandomNumber(0.0, 6.0)), scheduler)
            )
            .TimeInterval(scheduler)
            .Select(t => t.Interval.Milliseconds);

        var fromEvent = Observable.FromEventPattern<EventArgs>(
            p => CancelEvent += p,
            p => CancelEvent -= p,
            scheduler
        );

        var cancellable = observable.TakeUntil(fromEvent);

        var results = new List<int>();

        using (cancellable.Subscribe(
            results.Add,
            e => { throw new Exception("No exception is planned! {0}", e); },
            () => { completed = true; })
        )
        {
            scheduler.AdvanceBy(TimeSpan.FromSeconds(3.5).Ticks);
            CancelEvent(null, new EventArgs());
            scheduler.AdvanceBy(TimeSpan.FromSeconds(3).Ticks);
        }

        Console.WriteLine("Have I completed indeed? {0}", completed);
        Console.WriteLine("What emit time deltas been registered before cancellation?\n\t{0}", string.Join("ms\n\t", results));
    }
}
  • Can you simply `.Zip` them with a `.Interval(TimeSpan.FromMilliseconds(10.0))`? – Enigmativity Aug 30 '17 at 02:23
  • 1
    Also, keep in mind that Windows is not a **real-time operating system** and you can't get timings down to 10 ms no matter what you do. – Enigmativity Aug 30 '17 at 02:24
  • 1
    Zip will only work if the interval is always slower than source. If there's a long delay, followed by a burst, then they'll all bunch out at the same time. – Shlomo Aug 30 '17 at 04:18

1 Answers1

2

This is theoretically similar to A way to push buffered events in even intervals.

That solution would look like this:

var source = new Subject<double>();
var bufferTime = TimeSpan.FromMilliseconds(100);
var normalizedSource = source
    .Delay(bufferTime)
    .Drain(x => Observable.Empty<int>().Delay(TimeSpan.FromMilliseconds(10)));

...with Drain defined as follows:

public static class ObservableDrainExtensions
{
    public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source,
        Func<TSource, IObservable<TOut>> selector)
    {
        return Observable.Defer(() =>
        {
            BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());

            return source
                .Zip(queue, (v, q) => v)
                .SelectMany(v => selector(v)
                    .Do(_ => { }, () => queue.OnNext(new Unit()))
                );
        });
    }
}

However, I think you're going to run into problems with the 10 millisecond qualifier. That's too small a time to schedule. If I remember correctly, any delay less than 15ms is ignored by the schedulers and fired immediately. Given that, even if you used a larger interval (I tried with 100 ms), you're going to get some variance thanks to OS context switching, etc..

Shlomo
  • 14,102
  • 3
  • 28
  • 43