1

I'm trying to do a fully async read of a web end-point using Rx. I made it work, in an ugly way, using something like this:

        var reqUri = new Uri(string.Format("https://cds.cern.ch/record/{0}/export/xm?ln=en", docID));
        var wr = WebRequest.CreateHttp(reqUri);

        var s = Observable
                .FromAsyncPattern<WebResponse>(
                    wr.BeginGetResponse,
                    wr.EndGetResponse)
                .Invoke()
                .Catch(Observable.Return<WebResponse>(null))
                .Select(ExtractString)
                .Select(ParseToMD);

The ExtractString is a blocking method that opens the stream, and reads it back. I'd like to do it async, however, but to do that I'm having some trouble. I think the following should work:

        var s = Observable
                .FromAsyncPattern<WebResponse>(
                    wr.BeginGetResponse,
                    wr.EndGetResponse)
                .Invoke()
                .SelectMany(resp => Observable.Using(() => resp.GetResponseStream(), strm => Observable.Return(strm)))
                .SelectMany(resp => Observable.Using(() => new StreamReader(resp), strm => Observable.Return(strm)))
                .SelectMany(strm => Observable.FromAsync(tkn => strm.ReadToEndAsync()))
                .Select(ParseToMD);

And, if I put a break point at some point in there, sometimes it works. Other times it just hangs. So, I have two questions. First, is this the right way to do something like this? Second, what is the best way to debug something like this? Is there some tracing I could watch that shows the events going through the system? I have just been setting break points in the lambda functions for now.

The ParseMD function just converts a string into a class of data.

Gordon
  • 3,012
  • 2
  • 26
  • 35

1 Answers1

0

You can trace the stream lifetimes using the Spy method I present in this question.

You aren't far off here - I think the problem might be that you are being a bit over-zealous with the Observable.Using. The Observable.Return's can complete causing a race condition where the disposal of the response stream can take place before the stream reader is finished. I think this might behave more reliably:

var reqUri = new Uri(
    string.Format("https://cds.cern.ch/record/{0}/export/xm?ln=en", docId));

var s = Observable.FromAsyncPattern<WebResponse>(
    wr.BeginGetResponse,
    wr.EndGetResponse)
    .Invoke()
    .Select(response => response.GetResponseStream())
    .SelectMany(stream =>
        Observable.Using(    () => new StreamReader(stream),
                         reader => Observable.StartAsync(reader.ReadToEndAsync)))
    .Select(ParseToMD);
Community
  • 1
  • 1
James World
  • 29,019
  • 9
  • 86
  • 120
  • Eliminating the first Using (for the GetResponseStream) seems like it was the key. I guess that because the Observable.Return returned and then terminated, it would right-a-way call Dispose on that stream. While downstream items were still being processed. I'm still needing to better understand lifetimes here. :-) Thanks for the spy method! – Gordon Dec 20 '13 at 16:31
  • Oh, one other thing, StartAsync I think takes a mandatory argument - the cancelation token. Though ReadToEndAsync can't use it. – Gordon Dec 20 '13 at 16:31
  • I ran/wrote the above on an older version of Rx, you can safely ignore the cancellation token, it's there to pass through a cancellation of the subscription, in the event the asynchronous task supports it. – James World Dec 20 '13 at 17:26
  • Thanks, I had done exactly that. This has made my code a lot nicer! – Gordon Dec 22 '13 at 01:04