I have a remote program that sends an updated measurement every 10 milliseconds over a socket connection. In my client program I have wrapped this socket in an observable that generated these measurements. For my usecase it's important that the measurement arrive at 10 millisecond intervals. Of course this is not happening since network delays make it arrive a little earlier or later each message.
So basically what I have on my remote pc is program that sends this on a socket connection.
--
is 10 milliseconds
o--o--o--o--o--o--o--o--o--o--...
Which becomes this on my client due to network delays.
o-o---o-o--o---o--o-o--o-o-...
Now in my observable I want to "normalise" this so it will again emit a value each 10 millisecond.
--o--o--o--o--o--o--o--o--o--o...
Of course this will mean I will have to introduce a buffer time that it will store values and emit them on 10 millisecond interval. Is there a way I can accomplish this?
Here is some test code that will emit the event according to the way I described above.
using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading.Tasks;
using Microsoft.Reactive.Testing;
public class Program
{
protected static event EventHandler<EventArgs> CancelEvent;
private static Random random = new Random();
private static double GetRandomNumber(double minimum, double maximum)
{
return random.NextDouble() * (maximum - minimum) + minimum;
}
public static void Main()
{
var completed = false;
var scheduler = new TestScheduler();
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(7.0), scheduler)
.SelectMany(e => Observable
.Return(e, scheduler)
.Delay(TimeSpan.FromMilliseconds(GetRandomNumber(0.0, 6.0)), scheduler)
)
.TimeInterval(scheduler)
.Select(t => t.Interval.Milliseconds);
var fromEvent = Observable.FromEventPattern<EventArgs>(
p => CancelEvent += p,
p => CancelEvent -= p,
scheduler
);
var cancellable = observable.TakeUntil(fromEvent);
var results = new List<int>();
using (cancellable.Subscribe(
results.Add,
e => { throw new Exception("No exception is planned! {0}", e); },
() => { completed = true; })
)
{
scheduler.AdvanceBy(TimeSpan.FromSeconds(3.5).Ticks);
CancelEvent(null, new EventArgs());
scheduler.AdvanceBy(TimeSpan.FromSeconds(3).Ticks);
}
Console.WriteLine("Have I completed indeed? {0}", completed);
Console.WriteLine("What emit time deltas been registered before cancellation?\n\t{0}", string.Join("ms\n\t", results));
}
}