I actually do something similar in another project. What I learned or would do differently are the following:
First of all, better to use dedicated threads for the reading/writing loop (with new Thread(ParameterizedThreadStart)
) because Task.Run
uses a pool thread and as you use it in a (nearly) endless loop the thread is practically never returned to the pool.
var thread = new Thread(ReaderLoop) { Name = nameof(ReaderLoop) }; // priority, etc if needed
thread.Start(cancellationToken);
Your Process
can be an event, which you can invoke asynchronously so your reader loop can be return immediately to process the new incoming packages as fast as possible:
private void ReaderLoop(object state)
{
var token = (CancellationToken)state;
while (!token.IsCancellationRequested)
{
try
{
var message = MessageQueue.Take(token);
OnMessageReceived(new MessageReceivedEventArgs(message));
}
catch (OperationCanceledException)
{
if (!disposed && IsRunning)
Stop();
break;
}
}
}
Please note that if a delegate has multiple targets it's async invocation is not trivial. I created this extension method for invoking a delegate on pool threads:
public static void InvokeAsync<TEventArgs>(this EventHandler<TEventArgs> eventHandler, object sender, TEventArgs args)
{
void Callback(IAsyncResult ar)
{
var method = (EventHandler<TEventArgs>)ar.AsyncState;
try
{
method.EndInvoke(ar);
}
catch (Exception e)
{
HandleError(e, method);
}
}
foreach (EventHandler<TEventArgs> handler in eventHandler.GetInvocationList())
handler.BeginInvoke(sender, args, Callback, handler);
}
So the OnMessageReceived
implementation can be:
protected virtual void OnMessageReceived(MessageReceivedEventArgs e)
=> messageReceivedHandler.InvokeAsync(this, e);
Finally it was a big lesson that BlockingCollection<T>
has some performance issues. It uses SpinWait
internally, whose SpinOnce
method waits longer and longer times if there is no incoming data for a long time. This is a tricky issue because even if you log every single step of the processing you will not notice that everything is started delayed unless you can mock also the server side. Here you can find a fast BlockingCollection
implementation using an AutoResetEvent
for triggering incoming data. I added a Take(CancellationToken)
overload to it as follows:
/// <summary>
/// Takes an item from the <see cref="FastBlockingCollection{T}"/>
/// </summary>
public T Take(CancellationToken token)
{
T item;
while (!queue.TryDequeue(out item))
{
waitHandle.WaitOne(cancellationCheckTimeout); // can be 10-100 ms
token.ThrowIfCancellationRequested();
}
return item;
}
Basically that's it. Maybe not everything is applicable in your case, eg. if the nearly immediate response is not crucial the regular BlockingCollection
also will do it.