just trying to get my head around Rx
I am using Rx to poll a website every 2 seconds
var results = new List<MyDTO>();
var cx = new WebserviceAPI( ... );
var callback = cx.GetDataAsync().Subscribe(rs => { results.AddRange(rs); });
var poller = Observable.Interval(TimeSpan.FromSeconds(2)).Subscribe( _ => { cx.StartGetDataAsync(); });
(The webservice API exposes a getItemsAsync/getItemsCompleted event handler type mechanism from which I am creating an observable).
When the web site returns, I am unpacking the "business part of" the response into an IEnumerable of DTOs
public IObservable<IEnumerable<MyDTO>> GetDataAsync()
{
var o = Observable.FromEventPattern<getItemsCompletedEventHandler,getItemsCompletedEventArgs>(
h => _webService.getItemsCompleted += h,
h => _webService.getItemsCompleted -= h);
return o.Select(c=> from itm in c.EventArgs.Result.ItemList
select new MyDTO()
{
...
});
}
My reasoning being that given that all the data was just there in the string, it made sense just to pack it up there an then into an IEnumerable ... but now I'm not sure if that is right!
If the website takes longer than 2 secs to respond I am finding that MSTest is crashing out. When debugging, the error being generated is
"There was an error during asynchronous processing. Unique state object is required for multiple asynchronous simultaneous operations to be outstanding"
with the inner exception
"Item has already been added. Key in dictionary: 'System.Object' Key being added: 'System.Object'"
I am supposing that the problem is one of reentrancy in that the next call is starting and returning data before the previous call has finished populating the data.
So I'm not sure whether
- I have put the thing together quite right
- I should be throttling the connection in some way so as to avoid re-entrancy.
- I should use a different intermediate data structure (or mechanism) instead of an IEnumerable
I would appreciate some guidance.
EDIT 1: So I have changed the web call to include a unique state object
public void StartGetDataAsync()
{
...
// was: _webService.getItemsAsync(request);
_webService.getItemsAsync(request, Guid.NewGuid());
}
and made it work. But I am still unsure if that is the right way to do it
EDIT 2 - Web service sigs: I'm consuming a soap web service which the webServiceApi class wraps. The references.cs created contains the following methods
public void getItemsAsync(GetItemsReq request, object userState)
{
if ((this.getItemsOperationCompleted == null))
{
this.getItemsOperationCompleted = new System.Threading.SendOrPostCallback(this.OngetItemsOperationCompleted);
}
this.InvokeAsync("getItems", new object[] {
request}, this.getItemsOperationCompleted, userState);
}
private System.Threading.SendOrPostCallback getItemsOperationCompleted;
public event getItemsCompletedEventHandler getItemsCompleted;
public delegate void getItemsCompletedEventHandler(object sender, getItemsCompletedEventArgs e);
public partial class getItemsCompletedEventArgs : System.ComponentModel.AsyncCompletedEventArgs
{
...
}
private void OngetItemsOperationCompleted(object arg)
{
if ((this.getItemsCompleted != null))
{
System.Web.Services.Protocols.InvokeCompletedEventArgs invokeArgs = ((System.Web.Services.Protocols.InvokeCompletedEventArgs)(arg));
this.getItemsCompleted(this, new getItemsCompletedEventArgs(invokeArgs.Results, invokeArgs.Error, invokeArgs.Cancelled, invokeArgs.UserState));
}
}
Probably given you too much (or missed something)!
Thx