there are a tonne of built up tasks still awaiting... which then throw a tonne of exceptions due to the connection not being available
Yes, that's what I would expect with this code. And there's nothing really wrong with that, since each one of those sends are in fact failing to send. If your code works fine with this, then you might just want to keep it as-is.
Otherwise...
(the _connection.IsAvailable has already been checked).
Yes. Connection.IsAvailable
is useless. So are Socket.Connected
/ TcpClient.Connected
, for that matter. They're all useless because all they tell you is whether an error has already occurred. Which you, er, already know because the last call already threw an exception. They do not provide any guarantee or even a guess as to whether the next method will succeed. This is why you need other mechanisms to detect socket connection failure.
I have no issue waiting for the _connection.Send(data) to complete before moving onto the next element in the observable sequence. I fact, that would probably be preferable.
If Connection
is a simple wrapper around a socket without a write queue, then you should definitely only perform one call to Send
at a time. This is due to the fact that in resource-constrained scenarios (i.e., always in production, not on your dev box), a "write" operation for a socket may only write some of the bytes to the actual network stream. I assume your Connection
wrapper is handling partial writes by continuing to write until the entire data buffer is sent. This works great unless the code calls Send
multiple times - in which case you can end up with bytes being out of order (A
and then B
are written; A
partially completes and the wrapper sends the rest of A
in another write... after B
).
So, you'll need a write queue for reliable operation. If Connection
already provides one, then I'd say you don't need to do anything else; the multiple Send
s failing are normal. But if Connection
only handles sending that single data buffer and does not queue up its write requests, then you'll need to do that yourself.
This is most easily accomplished by using a TPL Dataflow block. Specifically, ActionBlock<T>
:
// Define the source observable.
var obs = ReadDatabase().SubscribeOn(NewThreadScheduler.Default);
// Create our queue which calls Send for each observable item.
var queue = new ActionBlock<string>(data => _connection.Send(data));
try
{
// Subscribe the queue to the observable and (asynchronously) wait for it to complete.
using (var subscription = obs.Subscribe(queue.AsObserver()))
await queue.Completion;
}
catch (Exception ex)
{
// The first exception thrown from Send will end up here.
Console.WriteLine(ex.Message);
}
Dataflow blocks understand asynchronous code, and by default they only process one item at a time. So, this code will invoke Send
one at a time, buffering up additional data items in a FIFO queue until that Send
completes.
Dataflow blocks have a "fail fast" behavior, so the first Send
that throws will fault the block, causing it to discard all remaining queued writes. When the block faults, await queue.Completion
will throw, unsubscribing from the observable and displaying the message.
If the observable completes, then await queue.Completion
will complete, again unsubscribing from the observable, and continue execution normally.
For more about interfacing Rx with TPL Dataflow, see my Concurrency in C# Cookbook, recipe 7.7. You may also find this Stack Overflow answer helpful in understanding why passing an async
lambda to Subscribe
isn't ideal.