6

I have a server which communicates with 50 or more devices over TCP LAN. There is a Task.Run for each socket reading message loop.

I buffer each message reach into a blocking queue, where each blocking queue has a Task.Run using a BlockingCollection.Take().

So something like (semi-pseudocode):

Socket Reading Task

Task.Run(() =>
{
    while (notCancelled)
    {
        element = ReadXml();
        switch (element)
        {
            case messageheader:
                MessageBlockingQueue.Add(deserialze<messageType>());
            ...
        }
    }
});

Message Buffer Task

Task.Run(() =>
{
    while (notCancelled)
    {
        Process(MessageQueue.Take());
    }
});

So that would make 50+ reading tasks and 50+ tasks blocking on their own buffers.

I did it this way to avoid blocking the reading loop and allow the program to distribute processing time on messages more fairly, or so I believe.

Is this an inefficient way to handle it? what would be a better way?

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
FinalFortune
  • 635
  • 10
  • 25

3 Answers3

8

You may be interested in the "channels" work, in particular: System.Threading.Channels. The aim of this is to provider asynchronous producer/consumer queues, covering both single and multiple producer and consumer scenarios, upper limits, etc. By using an asynchronous API, you aren't tying up lots of threads just waiting for something to do.

Your read loop would become:

while (notCancelled) {
    var next = await queue.Reader.ReadAsync(optionalCancellationToken);
    Process(next);
}

and the producer:

switch (element)
{
    case messageheader:
        queue.Writer.TryWrite(deserialze<messageType>());
        ...
}

so: minimal changes


Alternatively - or in combination - you could look into things like "pipelines" (https://www.nuget.org/packages/System.IO.Pipelines/) - since you're dealing with TCP data, this would be an ideal fit, and is something I've looked at for the custom web-socket server here on Stack Overflow (which deals with huge numbers of connections). Since the API is async throughout, it does a good job of balancing work - and the pipelines API is engineered with typical TCP scenarios in mind, for example partially consuming incoming data streams as you detect frame boundaries. I've written about this usage a lot, with code examples mostly here. Note that "pipelines" doesn't include a direct TCP layer, but the "kestrel" server includes one, or the third-party library https://www.nuget.org/packages/Pipelines.Sockets.Unofficial/ does (disclosure: I wrote it).

Marc Gravell
  • 1,026,079
  • 266
  • 2,566
  • 2,900
  • 1
    Interesting, will be taking a look at all of them. I was looking at TPL Dataflow, but seems like this is the better option. – FinalFortune Nov 22 '18 at 06:18
2

I actually do something similar in another project. What I learned or would do differently are the following:

  1. First of all, better to use dedicated threads for the reading/writing loop (with new Thread(ParameterizedThreadStart)) because Task.Run uses a pool thread and as you use it in a (nearly) endless loop the thread is practically never returned to the pool.

    var thread = new Thread(ReaderLoop) { Name = nameof(ReaderLoop) }; // priority, etc if needed
    thread.Start(cancellationToken);
    
  2. Your Process can be an event, which you can invoke asynchronously so your reader loop can be return immediately to process the new incoming packages as fast as possible:

    private void ReaderLoop(object state)
    {
        var token = (CancellationToken)state;
        while (!token.IsCancellationRequested)
        {
            try
            {
                var message = MessageQueue.Take(token);
                OnMessageReceived(new MessageReceivedEventArgs(message));
            }
            catch (OperationCanceledException)
            {
                if (!disposed && IsRunning)
                    Stop();
                break;
            }
        }
    }
    

Please note that if a delegate has multiple targets it's async invocation is not trivial. I created this extension method for invoking a delegate on pool threads:

public static void InvokeAsync<TEventArgs>(this EventHandler<TEventArgs> eventHandler, object sender, TEventArgs args)
{
    void Callback(IAsyncResult ar)
    {
        var method = (EventHandler<TEventArgs>)ar.AsyncState;
        try
        {
            method.EndInvoke(ar);
        }
        catch (Exception e)
        {
            HandleError(e, method);
        }
    }

    foreach (EventHandler<TEventArgs> handler in eventHandler.GetInvocationList())
        handler.BeginInvoke(sender, args, Callback, handler);
}

So the OnMessageReceived implementation can be:

protected virtual void OnMessageReceived(MessageReceivedEventArgs e)
    => messageReceivedHandler.InvokeAsync(this, e);
  1. Finally it was a big lesson that BlockingCollection<T> has some performance issues. It uses SpinWait internally, whose SpinOnce method waits longer and longer times if there is no incoming data for a long time. This is a tricky issue because even if you log every single step of the processing you will not notice that everything is started delayed unless you can mock also the server side. Here you can find a fast BlockingCollection implementation using an AutoResetEvent for triggering incoming data. I added a Take(CancellationToken) overload to it as follows:

    /// <summary>
    /// Takes an item from the <see cref="FastBlockingCollection{T}"/>
    /// </summary>
    public T Take(CancellationToken token)
    {
        T item;
        while (!queue.TryDequeue(out item))
        {
            waitHandle.WaitOne(cancellationCheckTimeout); // can be 10-100 ms
            token.ThrowIfCancellationRequested();
        }
    
        return item;
    }
    

Basically that's it. Maybe not everything is applicable in your case, eg. if the nearly immediate response is not crucial the regular BlockingCollection also will do it.

György Kőszeg
  • 17,093
  • 6
  • 37
  • 65
1

Yes, this is a bit inefficient, because you block ThreadPool threads. I already discussed this problem Using Task.Yield to overcome ThreadPool starvation while implementing producer/consumer pattern

You can also look at examples with testing a producer -consumer pattern here: https://github.com/BBGONE/TestThreadAffinity

You can use await Task.Yield in the loop to give other tasks access to this thread.

You can solve it also by using dedicated threads or better a custom ThreadScheduler which uses its own thread pool. But it is ineffective to create 50+ plain threads. Better to adjust the task, so it would be more cooperative.

If you use a BlockingCollection (because it can block the thread for long while waiting to write (if bounded) or to read or no items to read) then it is better to use System.Threading.Tasks.Channels https://github.com/stephentoub/corefxlab/blob/master/src/System.Threading.Tasks.Channels/README.md

They don't block the thread while waiting when the collection will be available to write or to read. There's an example how it is used https://github.com/BBGONE/TestThreadAffinity/tree/master/ThreadingChannelsCoreFX/ChannelsTest

Maxim T
  • 1,077
  • 9
  • 11