48

I am an experienced C# developer, but I have not developed a TCP server application so far. Now I have to develop a highly scalable and high performance server that can handle at least 5-10 thousand concurrent connections: getting -raw byte- data via GPRS from GPS devices.

A common communication process should look like this:

  • GPS device initiates a connection to my server
  • my server answers if I want to get data
  • device send GPS data
  • my server sends report to the device about getting it (sg like checksum)
  • getting new data from GPS, reportm and this happens again and again
  • later GPS DEVICE closes the connection

So, in my server I need

  • trace connected/active clients
  • to close any client from server side
  • catch the event, when a device closes the connection
  • get byte data
  • send data to clients

I started to read about this topic over the internet, but it seems to be a nightmare for me. There are a lot of ways, but I could not find out which is the best.

Async socket methods seems the best for me, but writing code in this async style is terrible and not easy to debug.

So my question is: which do you think the best way to implement a high performance TCP server in C#? Do you know any good open source component to do this? (I tried several ones, but I could not find a good one.)

Cœur
  • 37,241
  • 25
  • 195
  • 267
Tom
  • 3,899
  • 22
  • 78
  • 137
  • 1
    Not sure I'm understanding the flow. What does "my server sends report to the device about getting it" mean? Also, why do you need to keep the connection open? Why not just open them when necessary? – NotMe May 16 '11 at 21:04
  • 1
    Can you elaborate on why you say "writing code in this async style is terrible and not easy to debug" ? – Peter K. May 16 '11 at 21:05
  • So, it has to stay open, because they are GPRS connections and GPS and when they connect - mainly when they are in roaming - it costs money. So it is all about the money, more connection = more fee. Report means: server has to send a report to the device that it get the data. (They are some bytes) When device get this report, it can delete sent data from its memory, if it does not this report, tries to send this pocket again. – Tom May 16 '11 at 21:10
  • I am working on the same project. (I think your are buildind a GPS tracking system or something similar aren't you?). I would share experience with you. There is no Private messaging on StackExchange. Can you contact me? Address : (my-nick-name AT gmail.com) (quickly before this post gets deleted :p) Thank you. – MiniScalope Nov 24 '11 at 20:27

5 Answers5

40

It must be async, there is no way around this. High performance and scalability don't mix with one-thread-per-socket. You can have a look at what StackExchange themselves are doing, see async Redis await BookSleeve which leverages the CTP features from the next C# release (so is on the edge and subject to changes, but it is cool). For even more bleeding edge the solutions evolves around leveraging SocketAsyncEventArgs Class which takes things one step further by eliminating the frequent allocations of async handlers associated with 'classic' C# async processing:

The SocketAsyncEventArgs class is part of a set of enhancements to the System.Net.Sockets.Socket class that provide an alternative asynchronous pattern that can be used by specialized high-performance socket applications. This class was specifically designed for network server applications that require high performance. An application can use the enhanced asynchronous pattern exclusively or only in targeted hot areas (for example, when receiving large amounts of data).

Long story short: learn async or die trying...

BTW, if you're asking why async, then read the three articles linked from this post: High Performance Windows programs. The ultimate answer is: the underlying OS design requires it.

Kind Contributor
  • 17,547
  • 6
  • 53
  • 70
Remus Rusanu
  • 288,378
  • 40
  • 442
  • 569
  • 1
    Wouldn't making a thread for each client be just as efficient? I mean,when you call an async method you are essentially starting a thread that invokes a delegate when it's done, or at least that's how I understand it. Also if that's true, you'd probably end up with a higher thread count, which probably isn't the best thing for performance. If that's correct, then it has to create a new thread per each call, where a single thread looping through the standard blocking methods would seem more efficient. – Kelly Elton Nov 30 '11 at 04:30
  • 1
    @kelton52: your understanding is completely wrong. You need to read about async IO. – Remus Rusanu Nov 30 '11 at 06:18
  • 3
    @RemusRusanu If my understanding is completely wrong, then why is it when I run async tasks, they create threads? How is it possible to run a task asynchronously without opening a thread? I think a comment besides "your understanding is completely wrong" is warrented in this situation, as that's not much of a counter argument. – Kelly Elton Nov 30 '11 at 22:54
  • Kelton, it is very expensive and inefficient to use thread/client. By this way the number of clients is limited, you will never handle thousands of connections using this pattern. Use async pattern, no better way this time. – Tom Feb 09 '12 at 22:17
  • 2
    You have to understand how async io is handled at the lowest levels (kernel/drivers) to understand why it doesn't require a thread for each IO you post. The easiest way I can get you on the right track in this small space is to point out that deep inside the kernel there is only 1 "thread" controlling everything (pumps all threads), and if you start to think about it from that point of view, you can see how async IO can and IS optimized due to the OS itself. Maybe read about "thread context switching". – eselk Apr 19 '12 at 02:42
  • 1
    "Port Exhaustion" has nothing to do with servers listening for and accepting (no matter how many) client connections on a port. It is only an issue for processes that *initiate* many connections (outbound), i.e. clients that initiate many connection to servers in a short period of time. This is why your link talks about `the default range of ephemeral ports used by *client applications*`. Also check out [this technet article](http://blogs.technet.com/b/askds/archive/2008/10/29/port-exhaustion-and-you-or-why-the-netstat-tool-is-your-friend.aspx) and search for "outbound". – Eugen Dück Apr 08 '15 at 11:47
  • I agree with @EugenDück, Port Exhaustion has nothing to do with with the listening Server. A web server for example always listens on Port 80 for all clients. The connections are maintained as pairs of (ServerIP:ServerPort+ClientIP:ClientPort). The former is static, the latter is dynamic, therefore putting aside Memory/Paging resources, and IP/Port reservations a server can mathematically have ({Number of IP Addresses}*{Number of Ports}) of connections = 2^32*2^16 = 281,474,976,710,656 = ~281T client connections. – Kind Contributor Jun 06 '15 at 14:38
15

As Remus says above, you have to use async to keep performance high. That is the Begin.../End... methods in .NET.

Under the hood for sockets, these methods make use of IO Completion Ports which seems to be the most performant way of processing many sockets on Windows operating systems.

As Jim says, the TcpClient class can help here and is pretty easy to use. Here is an example of using the TcpListener to listen for incoming connections and the TcpClient to handle them, with the initial BeginAccept and BeginRead calls being async.

This example does assume a message based protocol is used over the sockets and that is ommitted except that the first 4 bytes of each transmission is the length, but that then allows you to use a synchronous Read on the stream to get the rest of the data that is already buffered.

Here is the code:

class ClientContext
{
    public TcpClient Client;
    public Stream Stream;
    public byte[] Buffer = new byte[4];
    public MemoryStream Message = new MemoryStream();
}

class Program
{
    static void OnMessageReceived(ClientContext context)
    {
        // process the message here
    }

    static void OnClientRead(IAsyncResult ar)
    {
        ClientContext context = ar.AsyncState as ClientContext;
        if (context == null)
            return;

        try
        {
            int read = context.Stream.EndRead(ar);
            context.Message.Write(context.Buffer, 0, read);

            int length = BitConverter.ToInt32(context.Buffer, 0);
            byte[] buffer = new byte[1024];
            while (length > 0)
            {
                read = context.Stream.Read(buffer, 0, Math.Min(buffer.Length, length));
                context.Message.Write(buffer, 0, read);
                length -= read;
            }

            OnMessageReceived(context);
        }
        catch (System.Exception)
        {
            context.Client.Close();
            context.Stream.Dispose();
            context.Message.Dispose();
            context = null;
        }
        finally
        {
            if (context != null)
                context.Stream.BeginRead(context.Buffer, 0, context.Buffer.Length, OnClientRead, context);
        }
    }

    static void OnClientAccepted(IAsyncResult ar)
    {
        TcpListener listener = ar.AsyncState as TcpListener;
        if (listener == null)
            return;

        try
        {
            ClientContext context = new ClientContext();
            context.Client = listener.EndAcceptTcpClient(ar);
            context.Stream = context.Client.GetStream();
            context.Stream.BeginRead(context.Buffer, 0, context.Buffer.Length, OnClientRead, context);
        }
        finally
        {
            listener.BeginAcceptTcpClient(OnClientAccepted, listener);
        }
    }

    static void Main(string[] args)
    {
        TcpListener listener = new TcpListener(new IPEndPoint(IPAddress.Any, 20000));
        listener.Start();

        listener.BeginAcceptTcpClient(OnClientAccepted, listener);

        Console.Write("Press enter to exit...");
        Console.ReadLine();
        listener.Stop();
    }
}

It demonstrates how to handle the async calls, but it will need error handling adding to make sure the TcpListener is always accepting new connections and more error handling for when clients disconnect unexpectedly. Also, there do seem to be a few cases where not all of the data arrives in one go that would need handling too.

Simon
  • 209
  • 1
  • 3
  • Nice and simple. Take note that 1st 4 bytes received should be message length for this to work. Otherwise if you receive less then 4 bytes it will loop in `while (length > 0)`. To fix this I just use initial `context.Client.ReceiveBufferSize` as initial size for the `context.Buffer` and handle protocol related stuff in `OnMessageReceived` part. – Laov May 31 '22 at 09:34
3

You can do this with the TcpClient class, although to tell the truth I don't know if you could have 10 thousand open sockets. That's quite a lot. But I regularly use TcpClient to handle dozens of concurrent sockets. And the asynchronous model is actually very nice to use.

Your biggest problem isn't going to be making TcpClient work. With 10 thousand concurrent connections, I'm thinking bandwidth and scalability are going to be problems. I don't even know if one machine can handle all that traffic. I suppose it depends on how large the packets are and how often they're coming in. But you'd better do some back-of-the-envelope estimation before you commit to implementing this all on a single computer.

Jim Mischel
  • 131,090
  • 20
  • 188
  • 351
  • This pockets are small, 1024 bytes per pocket. How often? It is up to the settings of GPS devices, it is up to the speed of moving, up to the heading and things like that. – Tom May 16 '11 at 21:06
  • The question title speaks of a TCP Server. Either you meant to say TCPListener or the nature of your answer is incorrect. Even if data would ultimately be distributed from one or more source GPS to one or more consuming clients, both publishers and subscribers would likely connect to the TCP Server to either publish or receive data. – Kind Contributor Jun 06 '15 at 14:44
2

I think you are also looking for UDP techniques. For 10k clients, it is fast but the issue is you have to implement ACKnowledgement for each message that you received the message. In UDP you dont need to open a socket for each client but need to implement heartbeat/ping mechanism after x seconds to check which client is connected or not.

Rndm
  • 6,710
  • 7
  • 39
  • 58
samad
  • 21
  • 1
  • Agreed. This project (now finished by now, right?) should be approached as a "soft" real time application. Now the only thing I'm trying to wrap my head around is why this all has to run on one machine. – FredTheWebGuy Jun 11 '13 at 23:37
0

You can use my TCP CSharpServer I have made, It is very simple to implement, just Implement IClientRequest on one of your Classes.

using System;
using System.Collections.Generic;
using System.Linq;

namespace cSharpServer
{
    public interface IClientRequest
    {        
        /// <summary>
        /// this needs to be set, otherwise the server will not beable to handle the request.
        /// </summary>
        byte IdType { get; set; } // This is used for Execution.
        /// <summary>
        /// handle the process by the client.
        /// </summary>
        /// <param name="data"></param>
        /// <param name="client"></param>
        /// <returns></returns>
        byte[] Process(BinaryBuffer data, Client client);
    }
}

BinaryBuffer allows you to read the data sent to the server really easy.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading.Tasks;

namespace cSharpServer
{
    public class BinaryBuffer
    {
        private const string Str0001 = "You are at the End of File!";
        private const string Str0002 = "You are Not Reading from the Buffer!";
        private const string Str0003 = "You are Currenlty Writing to the Buffer!";
        private const string Str0004 = "You are Currenlty Reading from the Buffer!";
        private const string Str0005 = "You are Not Writing to the Buffer!";
        private const string Str0006 = "You are trying to Reverse Seek, Unable to add a Negative value!";
        private bool _inRead;
        private bool _inWrite;
        private List<byte> _newBytes;
        private int _pointer;
        public byte[] ByteBuffer;

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public override string ToString()
        {
            return Helper.DefaultEncoding.GetString(ByteBuffer, 0, ByteBuffer.Length);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public BinaryBuffer(string data)
            : this(Helper.DefaultEncoding.GetBytes(data))
        {
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public BinaryBuffer()
        {
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public BinaryBuffer(byte[] data)
            : this(ref data)
        {
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public BinaryBuffer(ref byte[] data)
        {
            ByteBuffer = data;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void IncrementPointer(int add)
        {
            if (add < 0)
            {
                throw new Exception(Str0006);
            }
            _pointer += add;
            if (EofBuffer())
            {
                throw new Exception(Str0001);
            }
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public int GetPointer()
        {
            return _pointer;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public static string GetString(ref byte[] buffer)
        {
            return Helper.DefaultEncoding.GetString(buffer, 0, buffer.Length);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public static string GetString(byte[] buffer)
        {
            return GetString(ref buffer);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void BeginWrite()
        {
            if (_inRead)
            {
                throw new Exception(Str0004);
            }
            _inWrite = true;

            _newBytes = new List<byte>();
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(float value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }
            _newBytes.AddRange(BitConverter.GetBytes(value));
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(byte value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }
            _newBytes.Add(value);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(int value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }

            _newBytes.AddRange(BitConverter.GetBytes(value));
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(long value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }
            byte[] byteArray = new byte[8];

            unsafe
            {
                fixed (byte* bytePointer = byteArray)
                {
                    *((long*)bytePointer) = value;
                }
            }

            _newBytes.AddRange(byteArray);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public int UncommitedLength()
        {
            return _newBytes == null ? 0 : _newBytes.Count;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void WriteField(string value)
        {
            Write(value.Length);
            Write(value);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(string value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }
            byte[] byteArray = Helper.DefaultEncoding.GetBytes(value);
            _newBytes.AddRange(byteArray);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(decimal value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }
            int[] intArray = decimal.GetBits(value);

            Write(intArray[0]);
            Write(intArray[1]);
            Write(intArray[2]);
            Write(intArray[3]);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void SetInt(int value, int pos)
        {
            byte[] byteInt = BitConverter.GetBytes(value);
            for (int i = 0; i < byteInt.Length; i++)
            {
                _newBytes[pos + i] = byteInt[i];
            }
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void SetLong(long value, int pos)
        {
            byte[] byteInt = BitConverter.GetBytes(value);
            for (int i = 0; i < byteInt.Length; i++)
            {
                _newBytes[pos + i] = byteInt[i];
            }
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(byte[] value)
        {
            Write(ref value);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(ref byte[] value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }
            _newBytes.AddRange(value);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void EndWrite()
        {
            if (ByteBuffer != null)
            {
                _newBytes.InsertRange(0, ByteBuffer);
            }
            ByteBuffer = _newBytes.ToArray();
            _newBytes = null;
            _inWrite = false;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void EndRead()
        {
            _inRead = false;
            _pointer = 0;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void BeginRead()
        {
            if (_inWrite)
            {
                throw new Exception(Str0003);
            }
            _inRead = true;
            _pointer = 0;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public byte ReadByte()
        {
            if (!_inRead)
            {
                throw new Exception(Str0002);
            }
            if (EofBuffer())
            {
                throw new Exception(Str0001);
            }
            return ByteBuffer[_pointer++];
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public int ReadInt()
        {
            if (!_inRead)
            {
                throw new Exception(Str0002);
            }
            if (EofBuffer(4))
            {
                throw new Exception(Str0001);
            }
            int startPointer = _pointer;
            _pointer += 4;

            return BitConverter.ToInt32(ByteBuffer, startPointer);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public float[] ReadFloatArray()
        {
            float[] dataFloats = new float[ReadInt()];
            for (int i = 0; i < dataFloats.Length; i++)
            {
                dataFloats[i] = ReadFloat();
            }
            return dataFloats;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public float ReadFloat()
        {
            if (!_inRead)
            {
                throw new Exception(Str0002);
            }
            if (EofBuffer(sizeof(float)))
            {
                throw new Exception(Str0001);
            }
            int startPointer = _pointer;
            _pointer += sizeof(float);

            return BitConverter.ToSingle(ByteBuffer, startPointer);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public decimal ReadDecimal()
        {
            if (!_inRead)
            {
                throw new Exception(Str0002);
            }
            if (EofBuffer(16))
            {
                throw new Exception(Str0001);
            }
            return new decimal(new[] { ReadInt(),
                ReadInt(),
                ReadInt(),
                ReadInt()
            });
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public long ReadLong()
        {
            if (!_inRead)
            {
                throw new Exception(Str0002);
            }
            if (EofBuffer(8))
            {
                throw new Exception(Str0001);
            }
            int startPointer = _pointer;
            _pointer += 8;

            return BitConverter.ToInt64(ByteBuffer, startPointer);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public string ReadString(int size)
        {
            return Helper.DefaultEncoding.GetString(ReadByteArray(size), 0, size);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public byte[] ReadByteArray(int size)
        {
            if (!_inRead)
            {
                throw new Exception(Str0002);
            }
            if (EofBuffer(size))
            {
                throw new Exception(Str0001);
            }
            byte[] newBuffer = new byte[size];

            Array.Copy(ByteBuffer, _pointer, newBuffer, 0, size);

            _pointer += size;

            return newBuffer;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public bool EofBuffer(int over = 1)
        {
            return ByteBuffer == null || ((_pointer + over) > ByteBuffer.Length);
        }
    }
}

The Full Project is on GitHub CSharpServer