0

I'm currently developing a server that deals with clients using a consumer/producer approach with threads and a blocking collection as shown below:

public class NetworkClient
{
    private bool _started = false;
    private int _clientId;
    private readonly Socket _socket;

    private static AutoResetEvent _lastMessageWasAckd = new AutoResetEvent(true);
    private static BlockingCollection<byte[]> _messageQueue = new BlockingCollection<byte[]>(); 
        

    public NetworkClient(Socket socket, int clientId)
    {
        this._socket = socket;
        this._clientId = clientId;
    }

    public int getClientID() {
        return _clientId;
    }

    public void SendMessage(string message)
    {
        Console.WriteLine("Adding to player's sending queue " + _clientId);
        _messageQueue.Add(Encoding.ASCII.GetBytes(message));
    }


    public void Start()
    {
        Thread receiver = new Thread(new ThreadStart(ReceivingThreadProc));
        Thread sender = new Thread(new ThreadStart(SendingThreadProc));
        receiver.Start();
        sender.Start();
        this._started = true;
    }
        
    public void Stop()
    {
        this._started = false;
    }

    private void ReceivingThreadProc()
    {
        byte[] bytes = new byte[1024];
        string data;
        try
        {
            while (_started && _socket.Connected)
            {
                int numByte = this._socket.Receive(bytes);
                data = Encoding.ASCII.GetString(bytes, 0, numByte);
                if (numByte == 0)
                {
                    break;
                }

                if (data == "ACK")
                {
                    _lastMessageWasAckd.Set();
                    continue;
                }

                // Acknowledge the message
                _socket.Send(Encoding.ASCII.GetBytes("ACK"));

                ServerReceiver.onEvent(this._clientId, data);
            }
        }
        catch (Exception e)
        {
            this._socket.Close();
        }
    }

    private void SendingThreadProc()
    {
        while (_started && _socket.Connected)
        {
            _lastMessageWasAckd.WaitOne();

            byte[] message = _messageQueue.Take();
                
            Console.WriteLine("Sending the following message to client number: " + _clientId);
            Console.WriteLine(System.Text.Encoding.ASCII.GetString(message));
                
            _socket.Send(message);

            _lastMessageWasAckd.Reset();
        }
    }
}

There will be an instance of NetworkClient created for every client that connects for the server. The issue is that sometimes a message is queued to be sent to client 1 (this is confirmed by the Console.Writeline in the SendMessage method) however that message is sent to client 0 (Shown by the console writeline in the SendingThreadProc method) instead. Is this due to a thread safety issue, or am I missing something entirely? This typically happens when two messages are sent right after one another.

Any help would be greatly appreciated.

EDIT:

As many people rightly pointed out I haven't added where I call SendMessage I'll put this class down below:

class NetworkServer
{
    private int latestClient = 0;
    private ServerReceiver _serverReceiver;
    private static readonly Dictionary<int, NetworkClient> clients = new Dictionary<int, NetworkClient>();
        
    public NetworkServer()
    {
        IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 5656);
        Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

        this._serverReceiver = new ServerReceiver();
        this._serverReceiver.start();

        try
        {
            listener.Bind(endpoint);
            listener.Listen(10);

            while (true)
            {
                Socket clientSocket = listener.Accept();
                this.OnClientJoin(clientSocket);
            }
        }
        catch (Exception e)
        {
            Console.WriteLine(e.ToString());
        }
    }
public static void SendClientMessage(int clientId, string package, CommandType commandType,
    Dictionary<string, object> data = null)
{
    if (data == null)
    {
        data = new Dictionary<string, object>();
    }
        
    SendClientMessageRaw(clientId, new NetworkCommand(clientId, package, commandType, data).ToJson());
}

public static void SendClientMessageRaw(int id, string message)
{
    Console.WriteLine("Supposed to send to client number " + clients[id].getClientID());
    clients[id].SendMessage(message);
}

private void OnClientJoin(Socket socket)
{
    // Add client to array, perform handshake?
    NetworkClient networkClient = new NetworkClient(socket, latestClient);
    clients.Add(latestClient, networkClient);
    Console.WriteLine("player added :" + latestClient);
    networkClient.Start();
    latestClient++;
        
    if (latestClient == 2)
    {
        SendClientMessage(1, "Test", CommandType.Ping, null);
    }
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Tayyab Hussain
  • 107
  • 2
  • 10
  • You have not shown where you call the `SendMessage` method. – Theodor Zoulias Jan 24 '22 at 01:03
  • 1
    By the way since `_started` is used for cross-thread communication, it should be `volatile`, and it should also be set to `true` *before* starting the threads. That doesn't cause your problem here, but fix it anyway. – harold Jan 24 '22 at 01:06
  • `Socket`, `ThreadStart`, _synchronous I/O_ - this is a very _old school_ way of performing I/O. Look into `TcpClient`, `TcpListener`, `await Beginxxx` / `await xxxAsync` instead –  Jan 24 '22 at 01:10
  • The `BlockingCollection` class is supposed to free you from the burden of managing `AutoResetEvent` synchronization primitives. Since you are using both a `BlockingCollection` and an `AutoResetEvent`, you are probably doing something wrong. I can't say anything more specific, because the details of what you are trying to do are missing from the question. – Theodor Zoulias Jan 24 '22 at 01:11
  • 1
    @harold _"By the way since _started is used for cross-thread communication, it should be `volatile`"_ - incorrect. _["Frankly, I **discourage you from ever making a volatile field.** Volatile fields are a sign that you are doing something downright crazy: you're attempting to read and write the same value on two different threads without putting a lock in place..."](https://stackoverflow.com/a/17530556/585968)._ Consider using one of the `Interlocked` methods instead –  Jan 24 '22 at 01:16
  • I have edited my post to provide more details. @TheodorZoulias Im using the AutoResetEvent as a switch to wait for an acknowledgement for the client before proceeding. Most of this code is very similarly mirrored on the client. – Tayyab Hussain Jan 24 '22 at 01:19
  • @MickyD using the `Interlocked` is probably as crazy as using the `volatile`. In this particular case (ensuring the visibility of the `_started` field) it's not very crazy, provided that you know what you are doing. – Theodor Zoulias Jan 24 '22 at 02:01
  • 1
    @MickyD that's not bad advice, but `volatile` is sufficient for this use. The only thing that needs to be prevented is the thread never reading the new value of the flag. There is no other variable that's being synchronized by virtue of the flag, *that* is a very suspicious pattern. You can consider using the interlocked methods or whatever else, but this is one of the *very few* cases where `volatile` is fine. – harold Jan 24 '22 at 03:03
  • Will using volatile/interlocked actually make a difference? Also that wont fix the aforementioned problem will it? – Tayyab Hussain Jan 24 '22 at 11:12
  • @MickyD How would await help me here? – Tayyab Hussain Jan 24 '22 at 12:55

1 Answers1

2

Could it be because your message queue is static and therefore shared between all NetworkClients? Meaning client A can pick up a message for client B?

Might be as easy as removing static from the properties.

Max Hayman
  • 36
  • 2
  • When i remove the static property messages no longer get sent at all. – Tayyab Hussain Jan 24 '22 at 01:22
  • @TayyabHussain if you are going to use a `static` queue then you should have only 1 reader and 1 writer thread _to handle all `NetworkClient` instances_ **not** spin up threads _per instance of `NetworkClient`_ –  Jan 24 '22 at 02:45
  • @MickyD why is it that when the static keyword is removed. No messages are sent at all? – Tayyab Hussain Jan 24 '22 at 11:04
  • Apologies, it was because it was static. The only reason that didn't work for me was because autoreset was also static and shouldn't have been. – Tayyab Hussain Jan 27 '22 at 10:23