I'm implementing a data link layer using a producer/consumer pattern. The data link layer has its own thread and state machine to communicate the data link protocol over the wire (Ethernet, RS-232...). The interface to the physical layer is represented as a System.IO.Stream. Another thread writes messages to and reads messages from the data link object.
The data link object has an idle state that must wait for one of four conditions:
- A byte is received
- A message is available from the network thread
- The keep-alive timer has expired
- All communication was cancelled by the network layer
I'm having a difficult time figuring out the best way to do this without splitting up communication into a read/write thread (thereby significantly increasing the complexity). Here's how I can get 3 out of 4:
// Read a byte from 'stream'. Timeout after 10 sec. Monitor the cancellation token.
stream.ReadTimeout = 10000;
await stream.ReadAsync(buf, 0, 1, cts.Token);
or
BlockingCollection<byte[]> SendQueue = new ...;
...
// Check for a message from network layer. Timeout after 10 seconds.
// Monitor cancellation token.
SendQueue.TryTake(out msg, 10000, cts.Token);
What should I do to block the thread, waiting for all four conditions? All recommendations are welcome. I'm not set on any architecture or data structures.
EDIT: ******** Thanks for the help everyone. Here's my solution ********
First I don't think there was an asynchronous implementation of the producer/consumer queue. So I implemented something similar to this stackoverflow post.
I needed an external and internal cancellation source to stop the consumer thread and cancel the intermediate tasks, respectively, similar to this article.
byte[] buf = new byte[1];
using (CancellationTokenSource internalTokenSource = new CancellationTokenSource())
{
CancellationToken internalToken = internalTokenSource.Token;
CancellationToken stopToken = stopTokenSource.Token;
using (CancellationTokenSource linkedCts =
CancellationTokenSource.CreateLinkedTokenSource(stopToken, internalToken))
{
CancellationToken ct = linkedCts.Token;
Task<int> readTask = m_stream.ReadAsync(buf, 0, 1, ct);
Task<byte[]> msgTask = m_sendQueue.DequeueAsync(ct);
Task keepAliveTask = Task.Delay(m_keepAliveTime, ct);
// Wait for at least one task to complete
await Task.WhenAny(readTask, msgTask, keepAliveTask);
// Next cancel the other tasks
internalTokenSource.Cancel();
try {
await Task.WhenAll(readTask, msgTask, keepAliveTask);
} catch (OperationCanceledException e) {
if (e.CancellationToken == stopToken)
throw;
}
if (msgTask.IsCompleted)
// Send the network layer message
else if (readTask.IsCompleted)
// Process the byte from the physical layer
else
Contract.Assert(keepAliveTask.IsCompleted);
// Send a keep alive message
}
}