1

After quite some thought and rummaging around on the interwebs, I've come up with (what I think is) quite a nifty encapsulation of turning TcpIp into an Observable stream of data:

public static IObservable<string> CreateFromTcpIp(IPAddress ip, int port)
{
    return 
        Observable.Using(
            () => new TcpClient(),                               //create client
            client => client.ConnectObservable(ip, port)         //connect async
                      .Select(_ => client.GetStream())            // get stream
                      .Select(stream => new BinaryReader(stream)) // get reader 
                      .SelectMany(reader => reader.ToObservable()) // see below
                      .Retry()                                // if error, retry 
            )
            .Publish().RefCount(); //only subscribe once for all subscribers
}

Where ConnectObservable is just client.ConnectAsync(host, port).ToObservable(), and reader.ToObservable looks like this:

private static IObservable<string> ToObservable(this BinaryReader source)
{
   return Observable.Create<string>(
       observer =>
       {
           return Observable.Generate(
               source,
               reader => true, //just keep on going
               reader => reader,
               reader => reader.ReadString())
           .Subscribe(observer);
       });
}

The problem I'm facing is how to go about testing this. I don't want to implement/wrap interfaces all the way down that call stack.

I've been trying to create a 'server' using TcpListener inside my test class, but am coming up against various (possibly unrelated) problems getting the subscribing and serving to co-exist (Subscribing to the the results of my CreateFromTcpIp never seems to return the IDisposable, inserting delays in my tests seem to end up locking the assembly so I can't rebuild).

Is there an alternative approach that I'm missing?

EDIT The problem of Subscribe not returning is related to doing a Retry on a faulting Observable. I've asked another question here.

Community
  • 1
  • 1
Benjol
  • 63,995
  • 54
  • 186
  • 268
  • Did you consider using a Subject (or ISubject) instead of an IObservable? It's great for unit testing. – Gayot Fow Nov 25 '13 at 13:56
  • @GarryVass, I'm not sure what you mean, where would you stick it? – Benjol Nov 25 '13 at 14:16
  • I would add it as the first argument of your CreateFromTcpIp and use its OnNext to unit test against, and also listen to it in your client classes. Unit testing becomes a snap, and you've got a scalable solution by proxy :) – Gayot Fow Nov 25 '13 at 14:26
  • @GarryVass, sorry, I still don't get it. – Benjol Nov 25 '13 at 14:45
  • I could post some loosely sketched prototype code to show the concept if that's of any use. It would compile, but need you to fill in the blanks for your TCP stuff... – Gayot Fow Nov 25 '13 at 15:39
  • 2
    First, take out the `Retry` until you work out your unit testing. Second, post the unit test code where you tried to setup a `TcpListener` and where you tried to test your method against it. – Brandon Nov 25 '13 at 17:47
  • 1
    So... it might just be me, but this doesn't smell good? I mean, there seems to be a lot of assumptions here... Like, if the stream errors, we should just keep retrying forever. – cwharris Nov 25 '13 at 20:56
  • 1
    And if it doesn't error too... that BinaryReader ToObservable will throw an error if the stream finishes, it will try to read past the end of the stream. – James World Nov 25 '13 at 21:03
  • @ChristopherHarris. Yes, I suspect that this code is just too clever for its own good. And that's exactly why I wanted to test it, to see what happens in all these cases. In this particular case, I'm receiving data from a 'sensor', so I *do* want it to 'try forever'. Other code will check to see if 'nothing is coming', and call the sensor to reset the connection (that's the intention anyway). – Benjol Nov 26 '13 at 05:52
  • @Brandon, it's rather long, and scruffy. So I've put it [here](http://pastebin.com/uuAhBHV1) instead. – Benjol Nov 26 '13 at 07:08
  • Can't see your pastebin (Blocked at work), but this Retry should complete subscribing quickly - it will Subscribe (indirectly) to be async Connect attempt on the same thread its observable fails on. Is that call failing synchronously? Is the new stream empty and ReadString() is blocking? You can insert Do() calls between Rx operators to spy on them, which may help. – James World Nov 26 '13 at 14:34
  • Here's a handy function to help with testing/debugging: http://stackoverflow.com/questions/20220755/how-can-i-see-what-my-reactive-extensions-query-is-doing/20220756#20220756 – James World Nov 26 '13 at 15:07
  • 1
    @JamesWorld, think I've tracked it down. In the case where the Tcp is NOT responding, it's the ConnectAsync (or the ToObservable behind that) which is not 'telling me' about the error (it only tells me when I *unsubscribe* - not helpful). In the case where it IS responding, I needed to do a `SubscribeOn` to free the main thread up so it could return me the subscription. Thanks for your help! – Benjol Nov 27 '13 at 07:28
  • Good stuff, actually you reminded me I was going to amend my answer to your question on `Retry` with a comment on `SubscribeOn` - I'll do that now. – James World Nov 27 '13 at 07:41
  • @JamesWorld, actually scratch what I said about ConnectAsync not telling me about the error. I wasn't observing it because my Subscribe didn't define a handler... – Benjol Nov 27 '13 at 07:49

2 Answers2

2

I suppose you could test the BinaryReader ToObservable like this (if you make it accessible). The fact that this expects an exception possibly highlights some issues, although I wrote it to pass the code as written (uses nunit):

[Test]
public void TestStreamToObservable()
{

    var expectedText = new List<string>
    {
        "A good test is simple.",
        "A rolling stone gathers no moss.",
        "Test properly"
    };

    var stream = new MemoryStream();
    var writer = new BinaryWriter(stream);
    expectedText.ForEach(writer.Write);
    writer.Flush();

    stream.Seek(0, SeekOrigin.Begin);
    var reader = new BinaryReader(stream);

    var resultText = new List<string>();

    Assert.Throws<EndOfStreamException>(
        () => reader.ToObservable().Subscribe(resultText.Add));

    CollectionAssert.AreEquivalent(expectedText, resultText);
}

As for the rest, you've kind of scuppered yourself for anything but integration testing by creating a new TcpClient(). A new of anything concrete like that will always spell trouble for testing. You could refactor and pull the Retry().Publish().RefCount() out into a separate method that operates on IObservable<string>. Then leave the TcpClient bits to integration testing only.

James World
  • 29,019
  • 9
  • 86
  • 120
  • I know what you mean about newing a TcpClient, but if I pass that in, shouldn't I also pass in the (implicit) new Stream and new BinaryReader? At which point, there's not much left to test (maybe that's the point?). But it's the cascaded connection/stream errors that I wanted to test my code against... – Benjol Nov 26 '13 at 05:54
  • Strange, I previously had `reader.BaseStream.Position != reader.BaseStream.Length` instead of simply `true` as the completed condition, and it threw at me, so I put true in. I've tried putting it back, and in isolation (as you suggest with this test) it seems ok. Thanks for this. – Benjol Nov 26 '13 at 06:02
  • Careful with that, it won't work with an underlying Stream that doesn't support Length; the real stream won't. – James World Nov 26 '13 at 06:05
  • 1
    In fact, that neatly highlights the difficulty of testing sockets. There's a wonderful course on pluralsight.com by Michael Perry called "Provable Code"; in the Patterns chapter he rebuilds the Socket API - it's a highly instructive look at everything wrong with how it was built. TcpClient is better but still problematic. I was thinking you'd probably be best off building a method that accepted a Factory function that returned a disposable object with a single BinaryReader property. Then you could mock all the error points, and encapsulate the TcpClient goo. – James World Nov 26 '13 at 06:16
  • Thanks. As you can tell, in this particular domain (networky stuff), I'm blundering through as best I can. I'm going to try the factory idea though. – Benjol Nov 26 '13 at 06:30
  • I've ended up creating an `IConnectable` with `Task ConnectAsync()` and `IStringReader GetReader()`. With the other finer details in my answer here, I've got something that works (at least in my unit tests). Thanks again. – Benjol Nov 28 '13 at 12:52
  • Good stuff. Did you see this? http://stackoverflow.com/questions/20220755/how-can-i-see-what-my-reactive-extensions-query-is-doing Might be useful in your testing. – James World Nov 28 '13 at 13:55
1

OK, for posterity, here are some of the things I've learnt while working through this problem. Mostly it's about the subtleties of using Rx:

  • If you don't observe OnError, it will throw as an exception.
  • On top of that, if you've done a ToObservable on a Task and you don't observe OnError, it will only throw when you unsubscribe
  • Also, there's no point doing a Retry downstream from a Task: once it's thrown an exception, that's the Result, and it won't try again (too me a while to work that out!).
  • In the case where my subscription was never returning, I needed to SubscribeOn a different thread (Scheduler.Default).
  • Another bug was hinted at in James' answer: the ToObservable on the BinaryReader throws an exception. What is important here is that it is an Exception, not an OnError. So there's no Retry possible.

(Little code snippet to prove this to ourselves):

Observable.Generate(0, i => i < 4, i => i + 1, 
                    i => { if(i > 2) throw new Exception("sorry"); return i; })
          .Subscribe(i => Console.WriteLine(i), 
                     ex => Console.WriteLine(ex.Message));

Given this, I've recoded my ToObservable to use this (not so pretty):

return Observable.Create<string>(obs =>
{
    var subscribed = true;
    var errored = false;
    string result = String.Empty;
    while(subscribed && !errored)
    {
        try
        {
            result = source.ReadString();
        }
        catch(Exception ex)
        {
            errored = true;
            obs.OnError(ex);
        }
        obs.OnNext(result); 
    }
    return Disposable.Create(() => subscribed = false);
});
  • For further development, I wanted a Retry that was slightly less aggressive. This led me to this excellent answer. (It seems that not matter what I manage to write in Rx, there is always a more elegant way to do it)
  • James' Spy extension method was very helpful. Thoroughly recommended.
Community
  • 1
  • 1
Benjol
  • 63,995
  • 54
  • 186
  • 268
  • Great post-mortem. I never thought about `Retry` not working on a Task - in fact clearly *any* operator attempting a re-subscription would fail - and this behaviour isn't exposed in the `IObservable`, requiring you to be careful how such observables are passed around. This isn't a unique problem in Rx though, I do see plenty of examples of observables designed to be subscribe once. – James World Dec 04 '13 at 17:36
  • `Observable.FromAsync` is what you need to make a Task restart on each subscription. – James World Dec 06 '13 at 06:26