6

I'm implementing a data link layer using a producer/consumer pattern. The data link layer has its own thread and state machine to communicate the data link protocol over the wire (Ethernet, RS-232...). The interface to the physical layer is represented as a System.IO.Stream. Another thread writes messages to and reads messages from the data link object.

The data link object has an idle state that must wait for one of four conditions:

  1. A byte is received
  2. A message is available from the network thread
  3. The keep-alive timer has expired
  4. All communication was cancelled by the network layer

I'm having a difficult time figuring out the best way to do this without splitting up communication into a read/write thread (thereby significantly increasing the complexity). Here's how I can get 3 out of 4:

// Read a byte from 'stream'. Timeout after 10 sec. Monitor the cancellation token.
stream.ReadTimeout = 10000;
await stream.ReadAsync(buf, 0, 1, cts.Token);

or

BlockingCollection<byte[]> SendQueue = new ...;
...
// Check for a message from network layer. Timeout after 10 seconds.
// Monitor cancellation token.
SendQueue.TryTake(out msg, 10000, cts.Token);

What should I do to block the thread, waiting for all four conditions? All recommendations are welcome. I'm not set on any architecture or data structures.

EDIT: ******** Thanks for the help everyone. Here's my solution ********

First I don't think there was an asynchronous implementation of the producer/consumer queue. So I implemented something similar to this stackoverflow post.

I needed an external and internal cancellation source to stop the consumer thread and cancel the intermediate tasks, respectively, similar to this article.

byte[] buf = new byte[1];
using (CancellationTokenSource internalTokenSource = new CancellationTokenSource())
{
    CancellationToken internalToken = internalTokenSource.Token;
    CancellationToken stopToken = stopTokenSource.Token;
    using (CancellationTokenSource linkedCts =
        CancellationTokenSource.CreateLinkedTokenSource(stopToken, internalToken))
    {
        CancellationToken ct = linkedCts.Token;
        Task<int> readTask = m_stream.ReadAsync(buf, 0, 1, ct);
        Task<byte[]> msgTask = m_sendQueue.DequeueAsync(ct);
        Task keepAliveTask = Task.Delay(m_keepAliveTime, ct);

        // Wait for at least one task to complete
        await Task.WhenAny(readTask, msgTask, keepAliveTask);

        // Next cancel the other tasks
        internalTokenSource.Cancel();
        try {
            await Task.WhenAll(readTask, msgTask, keepAliveTask);
        } catch (OperationCanceledException e) {
            if (e.CancellationToken == stopToken)
                throw;
        }

        if (msgTask.IsCompleted)
            // Send the network layer message
        else if (readTask.IsCompleted)
            // Process the byte from the physical layer
        else
            Contract.Assert(keepAliveTask.IsCompleted);
            // Send a keep alive message
    }
}
Community
  • 1
  • 1
Brian Heilig
  • 592
  • 4
  • 16
  • 3
    [`await Task.WhenAny(...)`](https://msdn.microsoft.com/en-us/library/system.threading.tasks.task.whenany%28v=vs.110%29.aspx) might help. – Matthew Watson Mar 02 '16 at 14:22

3 Answers3

3

I would go with your option two, waiting for any of the 4 conditions to happen. Assuming you have the 4 tasks as awaitable methods already:

var task1 = WaitForByteReceivedAsync();
var task2 = WaitForMessageAvailableAsync();
var task3 = WaitForKeepAliveTimerAsync();
var task4 = WaitForCommunicationCancelledAsync();

// now gather them
IEnumerable<Task<bool>> theTasks = new List<IEnumerable<Task<bool>>>{
task1, task2, task3, task4
};

// Wait for any of the things to complete
var result = await Task.WhenAny(theTasks);

The code above will resume immediately after the first task completes, and ignore the other 3.

Note:

In the documentation for WhenAny, it says:

The returned task will always end in the RanToCompletion state with its Result set to the first task to complete. This is true even if the first task to complete ended in the Canceled or Faulted state.

So make sure to do that final check before trusting what happened:

if(result.Result.Result == true) 
... // First Result is the task, the second is the bool that the task returns
halfer
  • 19,824
  • 17
  • 99
  • 186
Pedro G. Dias
  • 3,162
  • 1
  • 18
  • 30
  • 1
    I think you meant WhenAny() – H H Mar 02 '16 at 14:33
  • 1
    No. WhenAny returns the moment any of the tasks returns. WhenAll awaits for every task given to complete, and returns in itself a Task (as opposed to WaitAll() – Pedro G. Dias Mar 02 '16 at 14:35
  • 1
    This doesn't work because the result of the byte read might be discarded. I assume he cares about that data not being lost. – usr Mar 02 '16 at 14:36
  • 1
    So after a Cancel you still want to wait for a byte or a message to come in? And vice versa? – H H Mar 02 '16 at 14:36
  • Yeah, I think I want WhenAny. I want to wait for any task to finish, then figure out which task(s) finished and continue from there. – Brian Heilig Mar 02 '16 at 14:36
  • 1
    The original question says " The data link object has an idle state that must wait for four conditions" - in my head, that means "wait for all 4 things to happen". If you mean any of those 4 can happen, then of course, WaitAny() is what you want :) – Pedro G. Dias Mar 02 '16 at 14:37
  • 1
    Agreed that you could read it that way but that could never work. – H H Mar 02 '16 at 14:38
  • 1
    Ok, rewriting to WhenAny() to clarify the answer :) – Pedro G. Dias Mar 02 '16 at 14:40
2

In this case, I would only use cancellation tokens for cancellation. A repeated timeout like a keep-alive timer is better represented as a timer.

So, I would model this as three cancelable tasks. First, the cancellation token:

All communication was cancelled by the network layer

CancellationToken token = ...;

Then, three concurrent operations:

A byte is received

var readByteTask = stream.ReadAsync(buf, 0, 1, token);

The keep-alive timer has expired

var keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10), token);

A message is available from the network thread

This one is a bit trickier. Your current code uses BlockingCollection<T>, which is not async-compatible. I recommend switching to TPL Dataflow's BufferBlock<T> or my own AsyncProducerConsumerQueue<T>, either of which can be used as async-compatible producer/consumer queues (meaning that the producer can be sync or async, and the consumer can be sync or async).

BufferBlock<byte[]> SendQueue = new ...;
...
var messageTask = SendQueue.ReceiveAsync(token);

Then you can use Task.WhenAny to determine which of these tasks completed:

var completedTask = await Task.WhenAny(readByteTask, keepAliveTimerTask, messageTask);

Now, you can retrieve results by comparing completedTask to the others and awaiting them:

if (completedTask == readByteTask)
{
  // Throw an exception if there was a read error or cancellation.
  await readByteTask;
  var byte = buf[0];
  ...
  // Continue reading
  readByteTask = stream.ReadAsync(buf, 0, 1, token);
}
else if (completedTask == keepAliveTimerTask)
{
  // Throw an exception if there was a cancellation.
  await keepAliveTimerTask;
  ...
  // Restart keepalive timer.
  keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10), token);
}
else if (completedTask == messageTask)
{
  // Throw an exception if there was a cancellation (or the SendQueue was marked as completed)
  byte[] message = await messageTask;
  ...
  // Continue reading
  messageTask = SendQueue.ReceiveAsync(token);
}
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • Wish I had seen this earlier as I went and implemented my own AsyncQueue, albeit not as general as yours. Why do you await each individual task? Shouldn't it already be completed? – Brian Heilig Mar 09 '16 at 15:25
  • @BrianHeilig: It's necessary to retrieve results. These tasks have completed but their results haven't been observed. Results can be data, e.g., `messageTask`, but results can also be exceptions. `keepAliveTimerTask` can have an exception if it's canceled, and `readByteTask` can have an exception if it's canceled or if there's some I/O read error. `await` will (re-)raise those exceptions if present. – Stephen Cleary Mar 09 '16 at 23:48
1

Cancelling a read leaves you no way to know whether data was read or not. Cancelling and reading are not atomic with respect to each other. That approach only works if you close the stream after cancellation.

The queue approach is better. You can create a linked CancellationTokenSource that becomes cancelled whenever you want it. Instead of passing cts.Token you pass a token that you control.

You can then signal that token based on time, another token and any other event that you like. If you use the built-in timeout the queue will internally do the same thing to link the incoming token with a timeout.

usr
  • 168,620
  • 35
  • 240
  • 369
  • Sorry, I was a little short on detail. The cts is a CancellationTokenSource which is owned by the data link layer. Cancellation is also controlled by the data link layer; the network thread calls stop which cancels all communication and waits for the thread to complete. The goal is to gracefully and quickly stop the data link layer. – Brian Heilig Mar 02 '16 at 14:34
  • 1
    And why would this answer not work? You can make `TryTake` abort on *any* condition you like. – usr Mar 02 '16 at 14:35
  • I think I understand. You are suggesting I signal the cancel token when the network thread cancels or a byte is available, right? I guess I would need another thread to synchronously read from the stream, then signal the cancel token when the read is complete. I'll need a variable to determine what was canceled (read or stop). I'll also need to cancel the read thread once a message is taken, right? Or am I missing something? – Brian Heilig Mar 02 '16 at 14:40
  • 1
    I think you link the token, cancel the linked cts when "message is available from the network thread" and also provide a timeout. That should get you all 4 conditions and never lose data from the queue. – usr Mar 02 '16 at 14:43
  • 1
    After cancellation you can check which one happened by examining the `IsCancellationRequested` property of all tokens involved. – usr Mar 02 '16 at 14:43
  • 1
    Frankly, I don't fully understand the scenario but I'm confident that you can make any cancellation scheme work using linked tokens and TryTake. – usr Mar 02 '16 at 14:44