5

I need to connect to 4 machines and read data from sockets. I've chosen to use an async model of nio2.

Here is a pseudo-code :

class Connector {

    private final AsynchronousChannelGroup group;
    private final String host;
    private final int port
    //...
    
    public void connect() {
        try (AsynchronousSocketChannel client = AsynchronousSocketChannel.open(group)) {

            client.connect(new InetSocketAddress(host(), port())).get(5, TimeUnit.SECONDS);

            if (!group.isShutdown()) {
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                client.read(buffer, 5, TimeUnit.SECONDS, new Attachement(buffer), new ReadHandler(client)); //#1
            }

        } catch (Exception e) {
            //
        }
    }

    private class ReadHandler implements CompletionHandler<Integer, Attachement> {

        final AsynchronousSocketChannel channel;

        @Override
        public void completed(Integer result, Attachement attachment) {
            attachment.buffer.clear();
            channel.read(attachment.buffer, 5, TimeUnit.SECONDS, attachment, this);
        }

        @Override
        public void failed(Throwable exc, Attachement attachment) {
            closeChannel();
        }

        void closeChannel() {
            try {
                if (channel.isOpen()) {
                    channel.close();
                }
            } catch (IOException e) {
                // do something else
            }
        }
    }
}

To run this code I create a new group and passe it to all the Connectors :

ExecutorService executor = Executors.newFixedThreadPool(4);
AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(executor);

//for each server
Connector connectorX = new Connector(group, serverX.host, serverX.port);

PROBLEM:

Threads are released after the first read (where you find //#1 comment).

SOLUTION !

To fix this I've introduced a CountDownLatch to keep threads waiting for reading :


//...
private CountDownLatch latch; 

public void connect() {

        latch = new CountDownLatch(1);

        try (AsynchronousSocketChannel client = AsynchronousSocketChannel.open(group)) {

            client.connect(new InetSocketAddress(host(), port())).get(5, TimeUnit.SECONDS);

            if (!group.isShutdown()) {
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                client.read(buffer, 5, TimeUnit.SECONDS, new Attachement(buffer), new ReadHandler(client)); //#1
            }
            
            latch.await();

        } catch (Exception e) {
            //...
            latch.countDown();
        }
}

QUESTION:

Is this (CountDownLatch) the right way to fix the problem? If no what's the best way?

Blocking the thread (latch.await()) is not in contradiction of the async model?

flywell
  • 384
  • 3
  • 20
  • The thread the `read()` is executed in will exit unless there is more code in it beyond what you've posted. Why exactly is this a problem? and what benefit do you anticipate by merely blocking it with a latch? – user207421 Feb 17 '22 at 02:43

0 Answers0