2

I have a game server that runs every 600 milliseconds, and during a cycle manipulates a byte array and then at the end of the cycle writes the byte array to the client.

Due to the uncertain nature of how many bytes will need to be written at the end of the cycle, I create a byte buffer for each write at the end of the cycle. What I'm uncertain of is if it would be faster making it direct or not.

From what I could gather from other similar questions, direct might be better since a direct byte buffer may be created anyway when the buffer is written to the SocketChannel if it isn't already direct. Any more clarification would be great.

In case my question wasn't clear enough, here's the code behind my networking: https://pastebin.com/M9wm88BA

package com.palidino.nio;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

public class NioServer implements Runnable {
    private List<Session> sessions = new ArrayList<>();
    private Map<String, Integer> connectionCounts = new HashMap<>();
    private InetSocketAddress hostAddress;
    private ServerSocketChannel serverSocketChannel;
    private Selector selector;
    private SessionHandler sessionHandler;
    private boolean running;

    private ByteBuffer readBuffer;
    private byte[] readBytes;

    private int sessionIdleTimeout;
    private int maxConnectionsPerIPAddress;
    private int socketBufferSize = 16384;

    public NioServer() throws IOException {
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
    }

    public void start(String remoteAddress, int port) throws IOException {
        if (hostAddress != null) {
            throw new IllegalStateException("Server already started");
        }
        if (sessionHandler == null) {
            throw new IllegalStateException("SsessionHandler can't be null");
        }
        readBuffer = ByteBuffer.allocateDirect(socketBufferSize);
        readBytes = new byte[socketBufferSize];
        hostAddress = new InetSocketAddress(remoteAddress, port);
        serverSocketChannel.socket().setReceiveBufferSize(socketBufferSize);
        serverSocketChannel.socket().bind(hostAddress);
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("Starting server on " + remoteAddress + ":" + port);
        new Thread(this, "NioServer").start();
    }

    public void stop() {
        try {
            if (serverSocketChannel != null) {
                serverSocketChannel.close();
                serverSocketChannel = null;
            }
        } catch (IOException ioe) {
            ioe.printStackTrace();
        }
    }

    public void setSessionHandler(SessionHandler sessionHandler) {
        this.sessionHandler = sessionHandler;
    }

    public void setSessionIdleTimeout(int seconds) {
        if (hostAddress != null) {
            throw new IllegalStateException("Server already started");
        }
        if (seconds <= 0) {
            throw new IllegalArgumentException("seconds must be greater than 0");
        }
        sessionIdleTimeout = seconds * 1000;
    }

    public void setMaxConnectionsPerIPAddress(int maxConnectionsPerIPAddress) {
        if (hostAddress != null) {
            throw new IllegalStateException("Server already started");
        }
        if (maxConnectionsPerIPAddress <= 0) {
            throw new IllegalArgumentException("maxConnectionsPerIPAddress must be greater than 0");
        }
        this.maxConnectionsPerIPAddress = maxConnectionsPerIPAddress;
    }

    public void setSocketBufferSize(int socketBufferSize) throws IOException {
        if (hostAddress != null) {
            throw new IllegalStateException("Server already started");
        }
        if (socketBufferSize <= 0) {
            throw new IllegalArgumentException("size must be greater than 0");
        }
        this.socketBufferSize = socketBufferSize;
    }

    @Override
    public void run() {
        if (running) {
            throw new IllegalStateException("Server is already running");
        }
        running = true;
        while (serverSocketChannel.isOpen()) {
            cycle();
        }
        running = false;
    }

    private void cycle() {
        try {
            selector.select();
            for (Iterator<SelectionKey> it = selector.selectedKeys().iterator(); it.hasNext();) {
                SelectionKey selectionKey = it.next();
                it.remove();
                Session session = null;
                try {
                    if (serverSocketChannel == null || !serverSocketChannel.isOpen()) {
                        break;
                    }
                    session = (Session) selectionKey.attachment();
                    if (selectionKey.isValid() && selectionKey.isAcceptable()) {
                        session = accept(selectionKey);
                    }
                    if (session == null) {
                        continue;
                    }
                    if (selectionKey.isValid() && selectionKey.isReadable()) {
                        read(selectionKey);
                    }
                    if (selectionKey.isValid() && selectionKey.isWritable()) {
                        write(selectionKey);
                    }
                } catch (Exception e2) {
                    error(e2, session);
                }
            }
            checkSessions();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private Session accept(SelectionKey selectionKey) throws IOException {
        Session session = null;
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.socket().setSendBufferSize(socketBufferSize);
        socketChannel.configureBlocking(false);
        String remoteAddress = socketChannel.socket().getInetAddress().getHostAddress();
        int connectionCount = getConnectionCount(remoteAddress);
        if (maxConnectionsPerIPAddress > 0 && connectionCount >= maxConnectionsPerIPAddress) {
            socketChannel.close();
        } else {
            connectionCounts.put(remoteAddress, connectionCount + 1);
            session = new Session(socketChannel, remoteAddress, socketChannel.register(selector, SelectionKey.OP_READ));
            sessionHandler.accept(session);
            sessions.add(session);
        }
        return session;
    }

    private void read(SelectionKey selectionKey) throws IOException {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        if (!socketChannel.isOpen()) {
            return;
        }
        Session session = (Session) selectionKey.attachment();
        readBuffer.clear();
        int numberBytesRead;
        ByteArrayOutputStream readStream = new ByteArrayOutputStream();
        while ((numberBytesRead = socketChannel.read(readBuffer)) > 0) {
            readBuffer.flip();
            readBuffer.get(readBytes, 0, numberBytesRead);
            readStream.write(readBytes, 0, numberBytesRead);
            readBuffer.clear();
            session.updateLastRead();
        }
        if (readStream.size() > 0) {
            sessionHandler.read(session, readStream.toByteArray());
        }
        if (numberBytesRead == -1) {
            session.close();
        }
    }

    private void write(SelectionKey selectionKey) throws IOException {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        if (!socketChannel.isOpen()) {
            return;
        }
        Session session = (Session) selectionKey.attachment();
        if (session.getWriteEvents().isEmpty()) {
            return;
        }
        try {
            while (!session.getWriteEvents().isEmpty()) {
                WriteEvent writeEvent = session.getWriteEvents().peek();
                socketChannel.write(writeEvent.getBuffer());
                if (writeEvent.getBuffer().remaining() > 0) {
                    break;
                }
                if (writeEvent.getHandler() != null) {
                    writeEvent.getHandler().complete(session, true);
                }
                session.getWriteEvents().poll();
            }
        } catch (Exception e) {
            error(e, session);
        }
        if (selectionKey.isValid() && session.getWriteEvents().isEmpty()) {
            selectionKey.interestOps(SelectionKey.OP_READ);
        }
    }

    private void error(Exception exception, Session session) throws IOException {
        try {
            sessionHandler.error(exception, session);
        } catch (Exception e) {
            if (session != null) {
                session.close();
            }
            e.printStackTrace();
        }
    }

    private void checkSessions() {
        if (sessions.isEmpty()) {
            return;
        }
        for (Iterator<Session> it = sessions.iterator(); it.hasNext();) {
            Session session = it.next();
            SelectionKey selectionKey = session.getSelectionKey();
            if (selectionKey.isValid() && !session.getWriteEvents().isEmpty()) {
                selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
            }
            if (session.idleTimeout(sessionIdleTimeout)) {
                session.close();
            }
            if (session.isOpen()) {
                continue;
            }
            String remoteAddress = session.getRemoteAddress();
            int connectionCount = getConnectionCount(remoteAddress);
            if (connectionCount > 1) {
                connectionCounts.put(remoteAddress, connectionCount - 1);
            } else {
                connectionCounts.remove(remoteAddress);
            }
            if (sessionHandler != null) {
                sessionHandler.closed(session);
            }
            if (selectionKey.isValid()) {
                selectionKey.cancel();
            }
            while (!session.getWriteEvents().isEmpty()) {
                WriteEvent writeEvent = session.getWriteEvents().poll();
                if (writeEvent.getHandler() != null) {
                    writeEvent.getHandler().complete(session, false);
                }
            }
            it.remove();
        }
    }

    private int getConnectionCount(String remoteAddress) {
        return connectionCounts.containsKey(remoteAddress) ? connectionCounts.get(remoteAddress) : 0;
    }

    public void printStats() {
        System.out
                .println("NIOServer: sessions: " + sessions.size() + "; connectionCounts: " + connectionCounts.size());
    }
}

https://pastebin.com/TxPQN7JZ

package com.palidino.nio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Session {
    private SocketChannel socketChannel;
    private SelectionKey selectionKey;
    private String remoteAddress;
    private long lastRead;
    private Queue<WriteEvent> writeEvents = new ConcurrentLinkedQueue<>();
    private Object attachment;

    public Session(SocketChannel socketChannel, String remoteAddress, SelectionKey selectionKey) {
        this.socketChannel = socketChannel;
        this.remoteAddress = remoteAddress;
        this.selectionKey = selectionKey;
        selectionKey.attach(this);
        updateLastRead();
    }

    public void write(byte[] bytes) {
        write(bytes, 0, bytes.length, null);
    }

    public void write(byte[] bytes, WriteEventHandler handler) {
        write(bytes, 0, bytes.length, handler);
    }

    public void write(byte[] bytes, int offset, int length) {
        write(bytes, 0, bytes.length, null);
    }

    public void write(byte[] bytes, int offset, int length, WriteEventHandler handler) {
        addWriteEvent(new WriteEvent(bytes, offset, length, handler));
    }

    public void write(ByteBuffer buffer) {
        write(buffer, null);
    }

    public void write(ByteBuffer buffer, WriteEventHandler handler) {
        addWriteEvent(new WriteEvent(buffer, handler));
    }

    private void addWriteEvent(WriteEvent writeEvent) {
        writeEvents.offer(writeEvent);
        if (selectionKey.isValid()) {
            selectionKey.selector().wakeup();
        }
    }

    public void close() {
        try {
            socketChannel.close();
        } catch (IOException ioe) {
        }
    }

    public boolean isOpen() {
        return socketChannel.isOpen();
    }

    public SocketChannel getSocketChannel() {
        return socketChannel;
    }

    public String getRemoteAddress() {
        return remoteAddress;
    }

    public long getLastRead() {
        return lastRead;
    }

    public boolean idleTimeout(int timeoutMillis) {
        return timeoutMillis > 0 && System.currentTimeMillis() - lastRead > timeoutMillis;
    }

    public void setAttachment(Object attachment) {
        this.attachment = attachment;
    }

    public Object getAttachment() {
        return attachment;
    }

    SelectionKey getSelectionKey() {
        return selectionKey;
    }

    void updateLastRead() {
        lastRead = System.currentTimeMillis();
    }

    Queue<WriteEvent> getWriteEvents() {
        return writeEvents;
    }
}

https://pastebin.com/r37vPUtJ

package com.palidino.nio;

import java.nio.ByteBuffer;

public class WriteEvent {
    private ByteBuffer buffer;
    private WriteEventHandler handler;

    public WriteEvent(byte[] original, int offset, int length, WriteEventHandler handler) {
        if (original == null) {
            throw new NullPointerException("original can't be null");
        }
        if (offset < 0 || length < 0) {
            throw new NegativeArraySizeException("offset and length must be greater than 0");
        }
        if (offset > original.length || length + offset > original.length) {
            throw new ArrayIndexOutOfBoundsException("length + offset can't be greater than original.length");
        }
        if (original.length == 0 || length == 0) {
            throw new IllegalArgumentException("length must be greater than 0");
        }
        buffer = ByteBuffer.allocateDirect(length);
        buffer.put(original, offset, length);
        buffer.flip();
        buffer = buffer.asReadOnlyBuffer();
        this.handler = handler;
    }

    public WriteEvent(ByteBuffer original, WriteEventHandler handler) {
        buffer = ByteBuffer.allocateDirect(original.capacity());
        ByteBuffer readOnlyCopy = original.asReadOnlyBuffer();
        readOnlyCopy.flip();
        buffer.put(readOnlyCopy);
        buffer.flip();
        buffer = buffer.asReadOnlyBuffer();
        this.handler = handler;
    }

    ByteBuffer getBuffer() {
        return buffer;
    }

    WriteEventHandler getHandler() {
        return handler;
    }
}

Alternatively to creating a direct buffer for each write, having a direct buffer in my selector thread and sizing it to where the fromBuffer should rarely if ever have a limit placed on it while writing.

private void write(SelectionKey selectionKey) throws IOException {
    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
    if (!socketChannel.isOpen()) {
        return;
    }
    Session session = (Session) selectionKey.attachment();
    if (session.getWriteEvents().isEmpty()) {
        return;
    }
    try {
        while (!session.getWriteEvents().isEmpty()) {
            WriteEvent writeEvent = session.getWriteEvents().peek();
            ByteBuffer fromBuffer = writeEvent.getBuffer();
            int position = fromBuffer.position();
            int limit = fromBuffer.limit();
            try {
                do {
                    directBuffer.clear();
                    if (fromBuffer.remaining() > directBuffer.remaining()) {
                        fromBuffer.limit(fromBuffer.remaining() - directBuffer.remaining());
                    }
                    directBuffer.put(fromBuffer);
                    directBuffer.flip();
                    position += socketChannel.write(directBuffer);
                    fromBuffer.position(position);
                    fromBuffer.limit(limit);
                } while (directBuffer.remaining() == 0 && fromBuffer.remaining() > 0);
                if (fromBuffer.remaining() > 0) {
                    break;
                }
                if (writeEvent.getHandler() != null) {
                    writeEvent.getHandler().complete(session, true);
                }
                session.getWriteEvents().poll();
            } catch (Exception e2) {
                fromBuffer.position(position);
                fromBuffer.limit(limit);
                throw e2;
            }
        }
    } catch (Exception e) {
        error(e, session);
    }
    if (selectionKey.isValid() && session.getWriteEvents().isEmpty()) {
        selectionKey.interestOps(SelectionKey.OP_READ);
    }
}
PalidinoDH
  • 93
  • 11
  • Using [Netty](https://netty.io/) is not an option? I know that may seem irrelevant but it would simplify a lot of this for you. Just sayin' – Sean Bright Jun 05 '19 at 20:38
  • just benchmark it, it's fun! – aran Jun 05 '19 at 20:42
  • While it is an option, my own code already does everything I need it to do, and having already used it, works perfectly fine in my experience. I'm mostly just looking to see if there were any optimizations I could make, specifically in how I allocate buffers. – PalidinoDH Jun 05 '19 at 20:45
  • There is no advantage to a direct bye buffer unless you are just copying between channels. Whichever you choose, use a single `ByteBuffer` for the life of the channel. Make it big enough to hold any possible message. The reason 'due to the uncertain nature of how many bytes need to be written' doesn't fly. – user207421 Jun 06 '19 at 01:16

1 Answers1

1

What I'm uncertain of is if it would be faster making it direct or not.

Reading into direct buffers can save you from additional copy/native memory allocation making it faster and consuming less CPU cycles. But if you want to process the data read to the direct buffer you would need to copy it into byte[] anyway.

Reading into the heap buffers involves reading into a temporary direct buffer and then copying its content back into the heap. The difference between direct and heap buffer is that direct buffer has its content malloced (I.e. off Java heap). Heap buffer in turn is backed by byte[] so you cannot obtain a raw pointer to its content to perform IO into without copying it (GC can move object across the heap causing heap corruption)

Although there is "critical region" in JNI inside which you can obtain that raw pointer without making the copy it has impact on GC since the object is either pinned to it's current location or GC is disabled completely. I was experimenting with it a while ago Measuring performance of java.io.InputStream

Some Name
  • 8,555
  • 5
  • 27
  • 77
  • Given that reading into heap buffer will allocate temporary direct buffer objects, wouldnt it be better to just read into a direct buffer and copy it (`get()`) to a byte[] when necessary? – experiment unit 1998X May 15 '23 at 02:09
  • 1
    @experimentunit1998X Not necessary. Because `FileChannelImpl` uses buffer cache under the hood when [performing write or read](https://github.com/openjdk/jdk/blob/master/src/java.base/share/classes/sun/nio/ch/IOUtil.java#L94). It tries to [find a suitable buffer](https://github.com/openjdk/jdk/blob/97b2ca3de76046c6f52d3649d8787feea7b9ac83/src/java.base/share/classes/sun/nio/ch/Util.java#L230) first and only then allocates a new one. – Some Name May 15 '23 at 14:36
  • I see, so if i am going to be getting bytes from a direct bytebuffer into a byte[], does that mean that I should probably just proceed with an on heap byte buffer? I noticed there isnt anything that would "clear" the cached buffer under the hood. What would happen if the temporary direct buffer becomes full? – experiment unit 1998X May 15 '23 at 15:59
  • 1
    @experimentunit1998X When requesting a buffer from a cache it's already [prepared](https://github.com/openjdk/jdk/blob/master/src/java.base/share/classes/sun/nio/ch/Util.java#L169) to accept the desired amount of bytes. Calling exactly `clear()` in not necessary to reset `position` and `limit` of a buffer. – Some Name May 15 '23 at 22:02
  • much appreciated. I understand that now – experiment unit 1998X May 16 '23 at 01:10