TL;DR I have an application that is reading messages from a USB device in the background, and displaying the messages on the screen. I am using a BlockingCollection, as I need to read messages quickly so the device does not get a BufferOverflow.
I am reading messages like this (my producer):
private void ReadMessages(BlockingCollection<object> logMessages)
{
uint numMsgs;
Status status;
Message[] msgs = new Message[10];
while(!logMessages.IsAddingCompleted)
{
numMsgs = (uint) msgs.Length;
status = readMessages(channel, msgs, ref numMsgs, 1000);
if(status == Status.ERR_BUFFER_OVERFLOW)
{
logMessages.Add("BUFFER OVERFLOW - MESSAGES LOST!");
logMessages.Add(CopyMessages(msgs, numMsgs));
}
else if(status == Status.STATUS_NOERROR)
{
logMessages.Add(CopyMessages(msgs, numMsgs));
}
else
{
throw new Exception("Error");
}
}
The readMessages() method will fill the msgs
array with the Message
objects read, and the numMsgs
reference holds the number of messages that were read (up to 10). I use a function called CopyMessages()
so I only pass a Message[]
that is the right size. i.e. if 5 messages are read, I send a Message[5]
instead of Message[10]
.
I read the messages (my consumer) like this:
private void DisplayMessages(BlockingCollection<object> messages)
{
string[] msgs;
try
{
foreach (var item in messages.GetConsumingEnumerable(_cancellationTokenSource.Token))
{
if (item is string)
{
msgs = new string[] { item.ToString() };
}
else if (item is PassThruMsg[])
{
msgs = FormatMessages((PassThruMsg[])item);
}
else
{
msgs = new string[0];
}
Task.Factory.StartNew(new Action(() => outputTextBox.AppendText(String.Join(Environment.NewLine, msgs) + Environment.NewLine)), _cancellationTokenSource.Token, TaskCreationOptions.None, uiContext);
}
}
catch (OperationCanceledException)
{
//TODO:
}
}
I start the tasks inside a button click, like this:
var results = new BlockingCollection<object>();
var display = Task.Factory.StartNew(() => DisplayMessages(results));
var readMessages = Task.Factory.StartNew(() => ReadMessages(results));
Task[] tasks = new Task[] { display, readMessages };
try
{
await Task.Factory.ContinueWhenAll(tasks, result => { results.CompleteAdding(); }, _cancellationTokenSource.Token, TaskContinuationOptions.None, uiContext);
}
catch (TaskCanceledException)
{
//TODO:
}
This works fine, and when running idly it prints the messages from the device without a problem. However, after the device starts doing work under a really heavy load (the consumer is called so quickly it locks the UI temporarily) that I notice the output textbox is repeating values. It is my understanding that GetConsumingEnumerable()
also removes items from the blocking collection, but I don't know why else I would see the messages printed multiple times. Each message has a timestamp, and when I readMessages from the device it clears the buffer so I know that I am not reading that message multiple times.
Am I missing something here? Is there a better way to handle this producer/consumer scenario to ensure accurate data? I have looked to see if there are references somewhere that may be overlapping, but I don't see it.