You could use the following idea, provided the criteria for flush is the number of packets (up to 1000). I did not test it. It makes use of Stephen Cleary's AsyncProducerConsumerQueue<T>
featured in this question.
AsyncProducerConsumerQueue<byte[]> _queue;
Stream _stream;
// producer
async Task ReceiveAsync(CancellationToken token)
{
while (true)
{
var list = new List<byte>();
while (true)
{
token.ThrowIfCancellationRequested(token);
var packet = await _device.ReadAsync(token);
list.Add(packet);
if (list.Count == 1000)
break;
}
// push next batch
await _queue.EnqueueAsync(list.ToArray(), token);
}
}
// consumer
async Task LogAsync(CancellationToken token)
{
Task previousFlush = Task.FromResult(0);
CancellationTokenSource cts = null;
while (true)
{
token.ThrowIfCancellationRequested(token);
// get next batch
var nextBatch = await _queue.DequeueAsync(token);
if (!previousFlush.IsCompleted)
{
cts.Cancel(); // cancel the previous flush if not ready
throw new Exception("failed to flush on time.");
}
await previousFlush; // it's completed, observe for any errors
// start flushing
cts = CancellationTokenSource.CreateLinkedTokenSource(token);
previousFlush = _stream.WriteAsync(nextBatch, 0, nextBatch.Count, cts.Token);
}
}
If you don't want to fail the logger but rather prefer to cancel the flush and proceed to the next batch, you can do so with a minimal change to this code.
In response to @l3arnon comment:
- A packet is not a byte, it's byte[]. 2. You haven't used the OP's ToHexString. 3. AsyncProducerConsumerQueue is much less robust and
tested than .Net's TPL Dataflow. 4. You await previousFlush for errors
just after you throw an exception which makes that line redundant.
etc. In short: I think the possible added value doesn't justify this
very complicated solution.
- "A packet is not a byte, it's byte[]" - A packet is a byte, this is obvious from the OP's code:
buffer[i] = await device.ReadAsync()
. Then, a batch of packets is byte[]
.
"You haven't used the OP's ToHexString." - The goal was to show how to use Stream.WriteAsync
which natively accepts a cancellation token, instead of WriteLineAsync
which doesn't allow cancellation. It's trivial to use ToHexString
with Stream.WriteAsync
and still take advantage of cancellation support:
var hexBytes = Encoding.ASCII.GetBytes(ToHexString(nextBatch) +
Environment.NewLine);
_stream.WriteAsync(hexBytes, 0, hexBytes.Length, token);
"AsyncProducerConsumerQueue is much less robust and tested than .Net's TPL Dataflow" - I don't think this is a determined fact. However, if the OP is concerned about it, he can use regular BlockingCollection
, which doesn't block the producer thread. It's OK to block the consumer thread while waiting for the next batch, because writing is done in parallel. As opposed to this, your TPL Dataflow version carries one redundant CPU and lock intensive operation: moving data from producer pipeline to writer pipleline with logAction.Post(packet)
, byte by byte. My code doesn't do that.
"You await previousFlush for errors just after you throw an exception which makes that line redundant." - This line is not redundant. Perhaps, you're missing this point: previousFlush.IsCompleted
can be true
when previousFlush.IsFaulted
or previousFlush.IsCancelled
is also true
. So, await previousFlush
is relevant there to observe any errors on the completed tasks (e.g., a write failure), which otherwise will be lost.