1

I have an observable which wraps a data source, which it continually watches and spits out changes as they occur:

IObservable<String> ReadDatasource()
{
    //returns data from the data source, and watches it for changes
}

and within my code I have a TCP connection

interface Connection
{
    Task Send(String data);
    Boolean IsAvailable { get; }
}

which subscribes to the observable:

Connection _connection;

ReadDatabase()
    .SubscribeOn(NewThreadScheduler.Default)
    .Subscribe(
        onNext: async r =>
        {
            if (_connection.IsAvailable)
            {
                try
                {
                    await _connection.Send(r);
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
           }
        }

If the connection is closed by the client when the observable is spitting out large volumes of data in quick succession, there are a tonne of built up tasks still awaiting (at least I think this is the case), which then throw a tonne of exceptions due to the connection not being available (the _connection.IsAvailable has already been checked). FWIW I do not have the ability to make changes inside the _connection.Send(data) method. I have no issue waiting for the _connection.Send(data) to complete before moving onto the next element in the observable sequence. I fact, that would probably be preferable.

Is there a simple Rx style of handling this case?

AaronHS
  • 1,334
  • 12
  • 29
  • You probably need to use `Observable.Using` - it's designed to keep an `IDisposable` resource open while the observable is running and then shut down the resource when the observable ends. – Enigmativity Aug 03 '17 at 10:28

1 Answers1

2

there are a tonne of built up tasks still awaiting... which then throw a tonne of exceptions due to the connection not being available

Yes, that's what I would expect with this code. And there's nothing really wrong with that, since each one of those sends are in fact failing to send. If your code works fine with this, then you might just want to keep it as-is.

Otherwise...

(the _connection.IsAvailable has already been checked).

Yes. Connection.IsAvailable is useless. So are Socket.Connected / TcpClient.Connected, for that matter. They're all useless because all they tell you is whether an error has already occurred. Which you, er, already know because the last call already threw an exception. They do not provide any guarantee or even a guess as to whether the next method will succeed. This is why you need other mechanisms to detect socket connection failure.

I have no issue waiting for the _connection.Send(data) to complete before moving onto the next element in the observable sequence. I fact, that would probably be preferable.

If Connection is a simple wrapper around a socket without a write queue, then you should definitely only perform one call to Send at a time. This is due to the fact that in resource-constrained scenarios (i.e., always in production, not on your dev box), a "write" operation for a socket may only write some of the bytes to the actual network stream. I assume your Connection wrapper is handling partial writes by continuing to write until the entire data buffer is sent. This works great unless the code calls Send multiple times - in which case you can end up with bytes being out of order (A and then B are written; A partially completes and the wrapper sends the rest of A in another write... after B).

So, you'll need a write queue for reliable operation. If Connection already provides one, then I'd say you don't need to do anything else; the multiple Sends failing are normal. But if Connection only handles sending that single data buffer and does not queue up its write requests, then you'll need to do that yourself.

This is most easily accomplished by using a TPL Dataflow block. Specifically, ActionBlock<T>:

// Define the source observable.
var obs = ReadDatabase().SubscribeOn(NewThreadScheduler.Default);

// Create our queue which calls Send for each observable item.
var queue = new ActionBlock<string>(data => _connection.Send(data));

try
{
  // Subscribe the queue to the observable and (asynchronously) wait for it to complete.
  using (var subscription = obs.Subscribe(queue.AsObserver()))
    await queue.Completion;
}
catch (Exception ex)
{
  // The first exception thrown from Send will end up here.
  Console.WriteLine(ex.Message);
}

Dataflow blocks understand asynchronous code, and by default they only process one item at a time. So, this code will invoke Send one at a time, buffering up additional data items in a FIFO queue until that Send completes.

Dataflow blocks have a "fail fast" behavior, so the first Send that throws will fault the block, causing it to discard all remaining queued writes. When the block faults, await queue.Completion will throw, unsubscribing from the observable and displaying the message.

If the observable completes, then await queue.Completion will complete, again unsubscribing from the observable, and continue execution normally.

For more about interfacing Rx with TPL Dataflow, see my Concurrency in C# Cookbook, recipe 7.7. You may also find this Stack Overflow answer helpful in understanding why passing an async lambda to Subscribe isn't ideal.

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • 1
    great answer. i knew the name was familiar because i already have your book! believe it or not I just opened my kindle to see where i was up to before i got distracted by "Introduction to Rx" (i needed to get up to speed in a hurry) and I'm up to "7.3 Async Wrappers for anything" – AaronHS Aug 03 '17 at 23:46