-2

I have a 'hot' data source, which exposes an event containing the data I'm interested in.

I've created a 'wrapper' which uses Observable.FromEventPattern to expose my data source as an Observable.

The problem I'm having is that using the wrapper, I always lose the first few data items, but only the first time I run it.

Some code:

Internally, the Wrapper class uses a RoutedObservable (successor of this) to allow Subscribers to 'sign up' before the Source has been wired up.

Wrapper.cs

RoutedObservable<Data> _Data = new RoutedObservable<Data>;
_Data.SetSource(Observable.FromEventPattern<EventArgs<Data>>
     (
         h => _Source.DataReceived += h,
         h => _Source.DataReceived -= h 
     )
     .Select(e => e.EventArgs.Data));

public IObservable<Data> Data { get { return _Data.Publish().RefCount(); } }

public void Start { _Source.Start(); }
public void Stop { _Source.Stop(); }

Test.cs

var _list = new List<Data>();
_Wrapper.Start();
var lastData = await _Wrapper.Data
             .Do(_list.Add)
             .Select(SomeConversion)
             .Take(NumberOfSamples)
             .LastAsync();
//Check lastData.Id == NumberOfSamples
// (give or take an off-by-one error)
//if Check not OK, have a look inside _list too see what's missing
_Wrapper.Stop();

(The Data.Id property is re-initialised to zero by the Source on every Stop/Start)

On first run, the _list has around 8 items missing at the beginning, but on subsequent runs, everything is ok.

Have you already seen something like this and have an idea what the cause may be? Or an idea of what to change/try to work out where the problem is coming from?

Community
  • 1
  • 1
Benjol
  • 63,995
  • 54
  • 186
  • 268

1 Answers1

0

So, while writing the question, I had a (psychic?) inspiration, and I've found out how to make it work:

The problem was (I think) the delay caused by the initialisation which was implicitly invoked by awaiting the data source - which as you can see in the pseudo code happens after the source is started.

The 'fix' I've found is to perform a 'dummy' subscribe before starting the data source (my Observable is Publish().RefCount()ed).

Create wrapper (creates data source and Observable)
empty Subscribe to observable <--------------
Start data source
await n data samples
Stop data source
Check sample[0] id == 0
Destroy wrapper (implicitly: leaves scope)

So, that explains why the first test wasn't working, but I still don't understand why subsequent tests were previously working.

Benjol
  • 63,995
  • 54
  • 186
  • 268
  • If all of your test runs are done from a single launch of the test application, then probably the first run through has different timings (JIt'ing, threads being added to the thread pool, etc) that caused that time between "Start Data Source" and "await n samples" to be longer. Subsequent runs had a shorter time because all that stuff was "warmed up". One never knows the exact reasons when race conditions are involved :D – Brandon Jul 03 '14 at 20:55