After quite some thought and rummaging around on the interwebs, I've come up with (what I think is) quite a nifty encapsulation of turning TcpIp into an Observable stream of data:
public static IObservable<string> CreateFromTcpIp(IPAddress ip, int port)
{
return
Observable.Using(
() => new TcpClient(), //create client
client => client.ConnectObservable(ip, port) //connect async
.Select(_ => client.GetStream()) // get stream
.Select(stream => new BinaryReader(stream)) // get reader
.SelectMany(reader => reader.ToObservable()) // see below
.Retry() // if error, retry
)
.Publish().RefCount(); //only subscribe once for all subscribers
}
Where ConnectObservable
is just client.ConnectAsync(host, port).ToObservable()
, and reader.ToObservable
looks like this:
private static IObservable<string> ToObservable(this BinaryReader source)
{
return Observable.Create<string>(
observer =>
{
return Observable.Generate(
source,
reader => true, //just keep on going
reader => reader,
reader => reader.ReadString())
.Subscribe(observer);
});
}
The problem I'm facing is how to go about testing this. I don't want to implement/wrap interfaces all the way down that call stack.
I've been trying to create a 'server' using TcpListener inside my test class, but am coming up against various (possibly unrelated) problems getting the subscribing and serving to co-exist (Subscribing to the the results of my CreateFromTcpIp never seems to return the IDisposable, inserting delays in my tests seem to end up locking the assembly so I can't rebuild).
Is there an alternative approach that I'm missing?
EDIT The problem of Subscribe not returning is related to doing a Retry on a faulting Observable. I've asked another question here.