7

I'm learning about async/await patterns in C#. Currently I'm trying to solve a problem like this:

  • There is a producer (a hardware device) that generates 1000 packets per second. I need to log this data to a file.

  • The device only has a ReadAsync() method to report a single packet at a time.

  • I need to buffer the packets and write them in the order they are generated to the file, only once a second.

  • Write operation should fail if the write process is not finished in time when the next batch of packets is ready to be written.

So far I have written something like below. It works but I am not sure if this is the best way to solve the problem. Any comments or suggestion? What is the best practice to approach this kind of Producer/Consumer problem where the consumer needs to aggregate the data received from the producer?

static async Task TestLogger(Device device, int seconds)
{
    const int bufLength = 1000;
    bool firstIteration = true;
    Task writerTask = null;

    using (var writer = new StreamWriter("test.log")))
    {
        do
        {
            var buffer = new byte[bufLength][];

            for (int i = 0; i < bufLength; i++)
            {
                buffer[i] = await device.ReadAsync();
            }

            if (!firstIteration)
            {
                if (!writerTask.IsCompleted)
                    throw new Exception("Write Time Out!");
            }

            writerTask = Task.Run(() =>
                {
                    foreach (var b in buffer)
                        writer.WriteLine(ToHexString(b));
                });

            firstIteration = false;
        } while (--seconds > 0);
    }
}
noseratio
  • 59,932
  • 34
  • 208
  • 486
AlefSin
  • 1,086
  • 9
  • 20
  • So the operation should fail when there's another 1000 packets accumulated already, while the previous flush is still in progress? Or when it's been more than 1 second the start of the flash? – avo Jun 05 '14 at 10:13
  • Operation will fail if the file operation is not complete before the next batch of packets are ready. Essentially, this is a dual-buffering scheme: one buffer is being written to the file while a second buffer is being filled by the input. – AlefSin Jun 05 '14 at 10:15
  • 1
    What determines the readiness of the buffer: the number of packets, or the time frame? – avo Jun 05 '14 at 10:19
  • 1
    Essentially, it does not matter: this is a hardware device and generates 1000 packets every seconds at very regular intervals. – AlefSin Jun 05 '14 at 10:29
  • 1
    From a code-review perspective I have trouble verifying that your code is correct regarding race conditions. This is a sign that you haven't found a great design yet. Good code is simple to review. – usr Jun 05 '14 at 15:28
  • 2
    @usr Sure, but that is basically the question I am asking: the above code has the basic functionality. Now what is the best way to implement it in a clear, maintainable way? – AlefSin Jun 05 '14 at 16:44
  • 1
    @AlefSin I don't know. Just my impression. You're right in asking this question. – usr Jun 05 '14 at 17:04

3 Answers3

1

A better approach IMHO would be to have 2 "workers", a producer and a consumer. The producer reads from the device and simply fills a list. The consumer "wakes up" every second and writes the batch to a file.

List<byte[]> _data = new List<byte[]>();

async Task Producer(Device device)
{
    while (true)
    {
        _data.Add(await device.ReadAsync());
    }
}

async Task Consumer(Device device)
{
    using (var writer = new StreamWriter("test.log")))
    {
        while (true)
        {
            Stopwatch watch = Stopwatch.StartNew();

            var batch = _data;
            _data = new List<byte[]>();
            foreach (var packet in batch)
            {
                writer.WriteLine(ToHexString(packet));

                if (watch.Elapsed >= TimeSpan.FromSeconds(1))
                {
                    throw new Exception("Write Time Out!");
                }
            }

            await Task.Delay(TimeSpan.FromSeconds(1) - watch.Elapsed);
        }
    }
}

The while (true) should probably be replaced by a system wide cancellation token.

i3arnon
  • 113,022
  • 33
  • 324
  • 344
  • Thanks. I'll wait and see if anyone else has a different or better idea. If nothing happens I'll accept your answer since I feel it is less convoluted than my own solution. – AlefSin Jun 05 '14 at 09:43
1

You could use the following idea, provided the criteria for flush is the number of packets (up to 1000). I did not test it. It makes use of Stephen Cleary's AsyncProducerConsumerQueue<T> featured in this question.

AsyncProducerConsumerQueue<byte[]> _queue;
Stream _stream;

// producer
async Task ReceiveAsync(CancellationToken token)
{
    while (true)
    {
       var list = new List<byte>();
       while (true)
       {
           token.ThrowIfCancellationRequested(token);
           var packet = await _device.ReadAsync(token);
           list.Add(packet);
           if (list.Count == 1000)
               break;
       }
       // push next batch
       await _queue.EnqueueAsync(list.ToArray(), token);
    }
}

// consumer
async Task LogAsync(CancellationToken token)
{
    Task previousFlush = Task.FromResult(0); 
    CancellationTokenSource cts = null;
    while (true)
    {
       token.ThrowIfCancellationRequested(token);
       // get next batch
       var nextBatch = await _queue.DequeueAsync(token);
       if (!previousFlush.IsCompleted)
       {
           cts.Cancel(); // cancel the previous flush if not ready
           throw new Exception("failed to flush on time.");
       }
       await previousFlush; // it's completed, observe for any errors
       // start flushing
       cts = CancellationTokenSource.CreateLinkedTokenSource(token);
       previousFlush = _stream.WriteAsync(nextBatch, 0, nextBatch.Count, cts.Token);
    }
}

If you don't want to fail the logger but rather prefer to cancel the flush and proceed to the next batch, you can do so with a minimal change to this code.

In response to @l3arnon comment:

  1. A packet is not a byte, it's byte[]. 2. You haven't used the OP's ToHexString. 3. AsyncProducerConsumerQueue is much less robust and tested than .Net's TPL Dataflow. 4. You await previousFlush for errors just after you throw an exception which makes that line redundant. etc. In short: I think the possible added value doesn't justify this very complicated solution.
  1. "A packet is not a byte, it's byte[]" - A packet is a byte, this is obvious from the OP's code: buffer[i] = await device.ReadAsync(). Then, a batch of packets is byte[].
  2. "You haven't used the OP's ToHexString." - The goal was to show how to use Stream.WriteAsync which natively accepts a cancellation token, instead of WriteLineAsync which doesn't allow cancellation. It's trivial to use ToHexString with Stream.WriteAsync and still take advantage of cancellation support:

    var hexBytes = Encoding.ASCII.GetBytes(ToHexString(nextBatch) + 
        Environment.NewLine);
    _stream.WriteAsync(hexBytes, 0, hexBytes.Length, token);
    
  3. "AsyncProducerConsumerQueue is much less robust and tested than .Net's TPL Dataflow" - I don't think this is a determined fact. However, if the OP is concerned about it, he can use regular BlockingCollection, which doesn't block the producer thread. It's OK to block the consumer thread while waiting for the next batch, because writing is done in parallel. As opposed to this, your TPL Dataflow version carries one redundant CPU and lock intensive operation: moving data from producer pipeline to writer pipleline with logAction.Post(packet), byte by byte. My code doesn't do that.

  4. "You await previousFlush for errors just after you throw an exception which makes that line redundant." - This line is not redundant. Perhaps, you're missing this point: previousFlush.IsCompleted can be true when previousFlush.IsFaulted or previousFlush.IsCancelled is also true. So, await previousFlush is relevant there to observe any errors on the completed tasks (e.g., a write failure), which otherwise will be lost.

Community
  • 1
  • 1
avo
  • 10,101
  • 13
  • 53
  • 81
  • Thanks @avo. The only question is, can the receiver get blocked at `_queue.EnqueueAsync()'? Since the producer is a hardware device, the receive task cannot be blocked to ensure no packet is lost. Otherwise I like your answer a lot. – AlefSin Jun 05 '14 at 12:46
  • @AlefSin, it's async, so it doesn't block a thread, by design. However, the `await _queue.EnqueueAsync()` continuation can be delayed until an async lock inside `AsyncProducerConsumerQueue` becomes available. I haven't studied Stephen's code, but I think it is a very fast operation, perhaps a few milliseconds. – avo Jun 05 '14 at 12:54
  • I guess I'll have to test and profile it then. The thing is we have precisely 1 millisecond between arrival of each packet (1000 packets per second) thus "few" milliseconds it not affordable. Still, in reality the code might work just fine. – AlefSin Jun 05 '14 at 12:57
  • @AlefSin, Windows is not a real-time OS. You still may be loosing individual packets with as high as 1000 samples per second rate, no matter what approach you take. – avo Jun 05 '14 at 13:05
  • True. Still, I have a C++ version of the same program running in user mode using the same underlying windows API to talk to the device and it works quite well. Over few hundred hours of recording I've never observed packet loss. Thus, yes, there are no guarantees but reaching some level confidence is possible. Edit: oh, I forgot, I even used another version in C# in the past based on events and delegates (again at 1 KHz) and it also has been very robust. – AlefSin Jun 05 '14 at 13:20
  • 1
    @avo, why do you need `AsyncProducerConsumerQueue` here? IMO, you'd be fine with standard blocking `BlockingCollection`, it may help reducing any added latency. Also, is `Packet` the same as `byte` in your code? – noseratio Jun 05 '14 at 22:10
  • 1
    @Noseratio, `BlockingCollection` would work too. `BlockingCollection.Take(token)` can be used instead of `await _queue.DequeueAsync(token)`. I replaced `Packet` with `byte` and further simplified the code. I'm leaving it up to @AlefSin to do performance tests and see if `BlockingCollection` works better. – avo Jun 06 '14 at 05:14
  • @I3arnon, is that you who downvoted this? If so, why? – avo Jun 06 '14 at 12:23
  • @avo No, and mentions don't notify unless a user commented before. But I can see some problems: 1. A packet is not a byte, it's byte[]. 2. You haven't used the OP's ToHexString. 3. AsyncProducerConsumerQueue is much less robust and tested than .Net's TPL Dataflow. 4. You await previousFlush for errors just after you throw an exception which makes that line redundant. etc. In short: I think the possible added value doesn't justify this very complicated solution. – i3arnon Jun 06 '14 at 16:45
  • @l3arnon, good to know it wasn't you. I didn't think you would downvote it either, after those multiple edits you made to your own answer based on my comments. Now, I addressed your concerns at the end of my answer, in much details, including what I think is a redundant copying of data in your version. – avo Jun 07 '14 at 01:51
  • @AlefSin may be interested to check this update, too. – avo Jun 07 '14 at 01:52
  • @avo I disagree with at least 3 of your answers (for example `buffer` is clearly a 2 dimensional array) but this isn't codereview.stackexchange.com and I feel this argument has ran its course... – i3arnon Jun 07 '14 at 14:18
  • 1
    @avo Sorry for my long delay. Finally after some tests and modifications to meet my other error handling needs, I decided that your answer provided the best building blocks for the solution. – AlefSin Jun 13 '14 at 07:20
1

Assuming you can batch by amount (1000) instead of time (1 second), the simplest solution is probably using TPL Dataflow's BatchBlock which automatically batches a flow of items by size:

async Task TestLogger(Device device, int seconds)
{
    var writer = new StreamWriter("test.log");
    var batch = new BatchBlock<byte[]>(1000);
    var logAction = new ActionBlock<byte[]>(
        packet =>
        {
            return writer.WriteLineAsync(ToHexString(packet));
        });
    ActionBlock<byte[]> transferAction;
    transferAction = new ActionBlock<byte[][]>(
        bytes =>
        {
            foreach (var packet in bytes)
            {
                if (transferAction.InputCount > 0)
                {
                    return; // or throw new Exception("Write Time Out!");
                }
                logAction.Post(packet);
            }
        }
    );

    batch.LinkTo(transferAction);
    logAction.Completion.ContinueWith(_ => writer.Dispose());

    while (true)
    {
        batch.Post(await device.ReadAsync());
    }
}
i3arnon
  • 113,022
  • 33
  • 324
  • 344
  • What if there's 1005 new packets already, but it's only been 995ms? Or only 995 packets, but 1005ms already elapsed? I think you should stick to either time or # of packets, but not both. – avo Jun 05 '14 at 11:58
  • @avo if there are 1005 new packets, 1000 of them will be written, If there are 995 they will not. the batching is based only on size. – i3arnon Jun 05 '14 at 12:02
  • My understanding is, @AlefSin want's to fail the write-out if the number of new pending packets is more than 1000. In which case, this approach doesn't work. That's how I see it, anyway. – avo Jun 05 '14 at 12:05
  • @avo That's not how i see it, but it's actually even simpler. Just query the [InputCount](http://msdn.microsoft.com/query/dev12.query?appId=Dev12IDEF1&l=EN-US&k=k(System.Threading.Tasks.Dataflow.ActionBlock`1.InputCount);k(TargetFrameworkMoniker-.NETFramework,Version%3Dv4.5.1);k(DevLang-csharp)&rd=true) – i3arnon Jun 05 '14 at 12:24
  • 1
    @I3arnon I have never used TPL before. This is a very interesting solution. I need to study it a bit since obviously my production quality code has other consideration to take into account. Overall, I like this approach a lot. – AlefSin Jun 05 '14 at 12:41
  • @l3arnon, what if `logAction.InputCount > 0` becomes `true` while you're executing `writer.WriteLine()`, which may be hanging because, for example, the network went down? I don't think using `await WriteLineAsync()` will help here, either. – avo Jun 05 '14 at 12:42
  • @I3arnon do you know if `batch.SendAsync()` is blocking or non-blocking? The device has no internal buffer so if new packet is received while the old one is not yet queued, there is a chance of packet loss. – AlefSin Jun 05 '14 at 12:48
  • @avo what about it? what network? test.log is a file. – i3arnon Jun 05 '14 at 12:48
  • @AlefSin it's blocking in async way. No thread is blocked, but it does wait for the transfer to end. You can use the Post method instead if you prefer a sync option. – i3arnon Jun 05 '14 at 12:50
  • 1
    @AlefSin TPL == task parallel library. You have been using it whenever you were using a task. TPL dataflow on the other hand, is unfortunately a less known library. – i3arnon Jun 05 '14 at 12:51
  • @l3arnon, no matter if it's a networked file or a local file. The I/O operation which takes place inside `writer.WriteLine()` can easily delay the next line of your code for a second or two. Then you'll loose two batches. My point is, the write operation should happen in *parallel* with observing the next batch. This is not what your code does. – avo Jun 05 '14 at 13:01
  • @avo your code doesn't either. CancellationToken doesn't magically stops execution, it needs to be checked. – i3arnon Jun 05 '14 at 13:07
  • @avo I updated once again, to alleviate your worries. Write operation will happen in **parallel**. – i3arnon Jun 05 '14 at 13:21
  • @l3arnon, in my code it actually does. There is a `Task.WhenAny` fork for fetching next batch and writing the previous one. – avo Jun 05 '14 at 13:21
  • @avo no it doesn't. After the `WhenAny` you await the `previousFlush`, which will only end when the code inside it (which you neglected to implement) checks the token parameter. – i3arnon Jun 05 '14 at 13:23
  • @l3arnon, you did spot a bug, fixing it made the code much simpler. I don't claim it's better than yours, but I think it's easier to understand, now that all edits have been done. – avo Jun 05 '14 at 13:45
  • @avo I strongly disagree, i think it's very hard to understand. For example, i think the bug is still in there, even after all the edits. the `await previousFlush` still waits. – i3arnon Jun 05 '14 at 14:17
  • @I3arnon Sorry for my long delay. I liked this solution a lot but after spending some time, I still had some problem with integrating other error handling situations (e.g. timeout, flushing the buffers on application exit, etc.) that I needed for my production version of the code. Still, there might be a clean way to make the error handling easier but at this stage I prefer to go with the more verbose answer. – AlefSin Jun 13 '14 at 07:22