I'm building a server app that accepts incoming TCP connections. (roughly 300 unique clients). It's important to note that I do not have control over the clients.
I have found that some of the connecting clients remain idle for quite some time after making the initial connection and sending the first status update. When they remain idle for over 5 mins the application's CPU usage jumps to over 90% and remains there.
To address this issue I built in a cancellation token that is triggered after 4 mins. This allows me to kill the connection. The client then detects this and reconnects about a minute later. This solves the high CPU usage issue, but has the side effect of high memory usage, there seems to be a memory leak. I suspect the resources is being held by the previous socket object.
I have a client object that contains the socket connection and information about the connected client. It also manages the incoming messages. There is also a manager class which accepts the incoming connections. It then creates the client object, assigns the socket to it and adds the client object to a concurrent dictionary. Every 10 seconds it checks the dictionary for clients that have been set to _closeConnection = true and calls their dispose method.
Here is the some of client object code:
public void StartCommunication()
{
Task.Run(async () =>
{
ArraySegment<byte> buffer = new ArraySegment<byte>(new byte[75]);
while (IsConnected)
{
try
{
// This is where I suspect the memory leak is originating - this call I suspect is not properly cleaned up when the object is diposed
var result = await SocketTaskExtensions.ReceiveAsync(ClientConnection.Client, buffer, SocketFlags.None).WithCancellation(cts.Token);
if (result > 0)
{
var message = new ClientMessage(buffer.Array, true);
if(message.IsValid)
HandleClientMessage(message);
}
}
catch (OperationCanceledException)
{
_closeConnection = true;
DisconnectReason = "Client has not reported in 4 mins";
}
catch (Exception e)
{
_closeConnection = true;
DisconnectReason = "Error during receive opperation";
}
}
});
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
_closeConnection = true;
cts.Cancel();
// Explicitly kill the underlying socket
if (UnitConnection.Client != null)
{
UnitConnection.Client.Close();
}
UnitConnection.Close();
cts.Dispose();
}
}
Task Extension Method:
public static async Task<T> WithCancellation<T>(this Task<T> task, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<bool>();
using (cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true), tcs))
{
if (task != await Task.WhenAny(task, tcs.Task))
{
throw new OperationCanceledException(cancellationToken);
}
}
return task.Result;
}
Mananger Code:
public bool StartListener()
{
_listener = new TcpListenerEx(IPAddress.Any, Convert.ToInt32(_serverPort));
_listener.Start();
Task.Run(async () =>
{
while (_maintainConnection) // <--- boolean flag to exit loop
{
try
{
HandleClientConnection(await _listener.AcceptTcpClientAsync());
}
catch (Exception e)
{
//<snip>
}
}
});
return true;
}
private void HandleClientConnection(TcpClient client)
{
Task.Run(async () =>
{
try
{
// Create new Coms object
var client = new ClientComsAsync();
client.ClientConnection = client;
// Start client communication
client.StartCommunication();
//_clients is the ConcurrentDictionary
ClientComsAsync existingClient;
if (_clients.TryGetValue(client.ClientName, out existingClient) && existingClient != null)
{
if (existingClient.IsConnected)
existingClient.SendHeatbeat();
if (!existingClient.IsConnected)
{
// Call Dispose on existing client
CleanUpClient(existingClient, "Reconnected with new connection");
}
}
}
catch (Exception e)
{
//<snip>
}
finally
{
//<snip>
}
});
}
private void CleanUpClient(ClientComsAsync client, string reason)
{
ClientComsAsync _client;
_units.TryRemove(client.ClientName, out _client);
if (_client != null)
{
_client.Dispose();
}
}