I was suggested to look into project loom and the virtual threads to be able to make use of effective use of the one thread per client design (haven't explored that route yet).
By digging documentation over the weekend, I was able to implement a better way to accept new connections and properly route data to the proper client without needing an individual thread per client. I was able to drastically increase the client count while lowering overhead and system memory usage by implementing the server with the NIO implementation of the socket server. With 3 test PC running a test client I was able to accept 36,036 connections and only spawning a peak of 210 workers threads managed by a thread pool (running the test client on windows you can only spawn about 16400 sockets before the SocketExeption with message maximum connection reached is thrown per PC, lot less under debian 11 from my testing).
private final ExecutorService _workers;
initialized as follow
this._workers = Executors.newCachedThreadPool();
Below is the new sever main loop making use of the NIO implementation with the multiplexer selector object, and a thread pool to do work when the selector returns a slectablekey. New challenge is now in implementing SSL server, since NIO from my current understanding do not offer an alternative from javax.net.ssl.SSLServerSocket.
/**
* The main server thread, this listens and accepts incoming client's connections.
*/
@Override
public void run()
{
System.out.println("Awaiting new client connection!");
while (!this._channel.socket().isClosed())
{
try
{
this._selector.select();
Iterator<SelectionKey> keys = this._selector.selectedKeys().iterator();
while (keys.hasNext())
{
SelectionKey key = keys.next();
keys.remove();
if (key.isAcceptable())
{
ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(this._selector, SelectionKey.OP_READ);
sc.socket().setTcpNoDelay(true);
sc.socket().setSoTimeout(this._cliTimeout);
var acceptor = this._factory.create(this._clients, sc);
var client = acceptor.createClient(this._workers);
if (client != null)
{
this._clients.put(sc, client);
System.out.printf
(
"Client %s accepter from %s\n\r",
client.getSessionID(),
client.getRemoteAddress()
);
}
else
{
System.err.printf("Client from %s rejected!\n\r", sc.getRemoteAddress());
try
{
sc.socket().setSoLinger(true, 5);
sc.close();
}
catch (IOException ioex)
{
System.err.printf
(
"Client from %s closig error %s!\n\r",
sc.getRemoteAddress(),
ioex.getMessage()
);
}
}
System.out.println("Awaiting new client connection!");
}
else if (key.isReadable())
{
SocketChannel sc = (SocketChannel)key.channel();
var client = this._clients.get(sc);
if (client != null)
this._workers.execute(client);
else
sc.close();
}
}
}
catch (IOException ioex)
{
try
{
this.close();
}
catch (IOException ioex2)
{
ioex2.printStackTrace();
}
}
catch (ClosedSelectorException csex)
{
// The multiplexer (selector) was closed.
// therefore this thread can finnaly be terminated.
break;
}
catch (CancelledKeyException ckex)
{
// Do nothing for this exception, the key operation was canceled.
// this do not mean the server have problems or was shutdown.
}
}
}
Implementing SSL socket and the remote peer handlers for each connections. To get the SSL engine to work took me some real documentation digging, but got this working with the following code.
/**
* Main listener thread for this client.
*/
@Override
public final void run()
{
this.decrypt();
}
/**
* Handles the hand shake.
* @throws java.lang.InterruptedException
* @throws java.io.IOException
*/
public void handShake() throws InterruptedException, IOException
{
System.out.printf("SSL handShake %s... Start\n\r", this._id);
while (true)
{
if (this._isHandshakeRxedHello)
{
HandshakeStatus status = this._sslEngine.getHandshakeStatus();
if (null != status)
switch (status)
{
case FINISHED:
System.out.printf("SSL handShake %s... Finish HANDSHAKE INTERRUPTED\n\r", this._id);
return;
case NOT_HANDSHAKING:
if (this._isHandshakeRxedHello)
{
System.out.printf("SSL handShake %s... Finish OK\n\r", this._id);
this.onConnected();
return;
}
break;
case NEED_WRAP:
this.encrypt(ByteBuffer.allocate(0));
break;
case NEED_TASK:
this._sslEngine.getDelegatedTask().run();
break;
default:
break;
}
}
else
{
if (this._channel.socket().isClosed())
{
System.out.printf("SSL handShake %s... Finish ERROR SOCKET CLOSE\n\r", this._id);
return;
}
}
Thread.sleep(100);
}
}
/**
* Decrypt RX cypher data, and process the plain text data.
*/
private synchronized void decrypt()
{
try
{
this._rxCyper.clear();
int len = this._channel.read(this._rxCyper);
if (len == -1)
{
this.cleanUpAndClose();
}
else if (len > 0)
{
this._rxCyper.flip();
this._rxBuffer.clear();
this._tsLastRx = Clock.systemUTC().instant().getEpochSecond();
SSLEngineResult results;
while (this._rxCyper.hasRemaining())
{
results = this._sslEngine.unwrap(this._rxCyper, this._rxBuffer);
if (results.bytesProduced() > 0)
{
this._rxBuffer.flip();
this.onReveived(this._rxBuffer);
}
else
this._isHandshakeRxedHello = true;
}
}
}
catch (IOException ex)
{
this.cleanUpAndClose();
}
}
/**
* Encrypt data and TX cypher to remote peer.
* @param data the plain text data to be sent.
* @throws SSLException
* @throws IOException
*/
private synchronized void encrypt(final ByteBuffer data) throws SSLException, IOException
{
this._txCyper.clear();
var status = this._sslEngine.wrap(data, this._txCyper);
if (status.getStatus() == SSLEngineResult.Status.OK)
{
this._txCyper.flip();
this._channel.write(this._txCyper);
if (status.getHandshakeStatus() == HandshakeStatus.NEED_WRAP)
this.encrypt(data);
}
else if (status.getStatus() == SSLEngineResult.Status.BUFFER_OVERFLOW)
{
var b = ByteBuffer.allocate(this._sslEngine.getSession().getPacketBufferSize());
status = this._sslEngine.wrap(data, b);
if (status.getStatus() == SSLEngineResult.Status.OK)
{
b.flip();
this._channel.write(b);
}
else
{
throw new SSLException("Failed to encrypt!");
}
}
}
This is the main loop for the NIO implementation of a remote peer connection handler without SSL. This will pull new data and do onReceive logics on a thread from the pool, this are non blocking methods.
/**
* Main listener thread for this client.
*/
@Override
public final void run()
{
this.rxhandler();
}
/**
* Low level RX handler.
*/
private synchronized void rxhandler()
{
try
{
this._rxBuff.clear();
int len = this._channel.read(this._rxBuff);
if (len == -1)
{
this.cleanUpAndClose();
return;
}
else if (len > 0)
{
this._tsLastRx = Clock.systemUTC().instant().getEpochSecond();
this._rxBuff.flip();
this.onReceived(this._rxBuff);
}
}
catch (IOException ex)
{
this.cleanUpAndClose();
}
}
Finally to start one of the 4 implementation of the server.
Obviously there is alot of code I left out, but this should help someone else trying to implement a non blocking server using NIO and a thread pool.
ITCPServer server = null;
boolean run = true;
try
{
if (nio)
{
if (ssl)
{
System.out.println("Starting NIO:SSL server!");
server = NioTCPServer.create
(
listeningPort,
100000,
new NioTCPServerTLSv13AcceptorFactory
(
new NioDebugTCPRemClientFactory(),
"cert.p12",
"password"
)
);
server.setClientTimeout(0);
}
else
{
System.out.println("Starting NIO server!");
server = NioTCPServer.create
(
listeningPort,
100000,
new NioTCPServerAcceptorFactory
(
new NioDebugTCPRemClientFactory()
)
);
server.setClientTimeout(0);
}
}
else
{
if (ssl)
{
System.out.println("Starting SSL server!");
server = TCPServer.createSSL
(
listeningPort,
100000,
new DebugTCPRemClientFactory(),
"cert.p12",
"password"
);
}
else
{
System.out.println("Starting server!");
server = TCPServer.create
(
listeningPort,
100000,
new DebugTCPRemClientFactory()
);
}
}
Thread t = new Thread(server);
t.start();
System.out.printf("Server started listening at %s:%d\n\r", InetAddress.getLocalHost(), listeningPort);