I need to connect to 4 machines and read data from sockets. I've chosen to use an async model of nio2.
Here is a pseudo-code :
class Connector {
private final AsynchronousChannelGroup group;
private final String host;
private final int port
//...
public void connect() {
try (AsynchronousSocketChannel client = AsynchronousSocketChannel.open(group)) {
client.connect(new InetSocketAddress(host(), port())).get(5, TimeUnit.SECONDS);
if (!group.isShutdown()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
client.read(buffer, 5, TimeUnit.SECONDS, new Attachement(buffer), new ReadHandler(client)); //#1
}
} catch (Exception e) {
//
}
}
private class ReadHandler implements CompletionHandler<Integer, Attachement> {
final AsynchronousSocketChannel channel;
@Override
public void completed(Integer result, Attachement attachment) {
attachment.buffer.clear();
channel.read(attachment.buffer, 5, TimeUnit.SECONDS, attachment, this);
}
@Override
public void failed(Throwable exc, Attachement attachment) {
closeChannel();
}
void closeChannel() {
try {
if (channel.isOpen()) {
channel.close();
}
} catch (IOException e) {
// do something else
}
}
}
}
To run this code I create a new group and passe it to all the Connector
s :
ExecutorService executor = Executors.newFixedThreadPool(4);
AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(executor);
//for each server
Connector connectorX = new Connector(group, serverX.host, serverX.port);
PROBLEM:
Threads are released after the first read (where you find //#1
comment).
SOLUTION !
To fix this I've introduced a CountDownLatch
to keep threads waiting for reading :
//...
private CountDownLatch latch;
public void connect() {
latch = new CountDownLatch(1);
try (AsynchronousSocketChannel client = AsynchronousSocketChannel.open(group)) {
client.connect(new InetSocketAddress(host(), port())).get(5, TimeUnit.SECONDS);
if (!group.isShutdown()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
client.read(buffer, 5, TimeUnit.SECONDS, new Attachement(buffer), new ReadHandler(client)); //#1
}
latch.await();
} catch (Exception e) {
//...
latch.countDown();
}
}
QUESTION:
Is this (CountDownLatch
) the right way to fix the problem? If no what's the best way?
Blocking the thread (latch.await()
) is not in contradiction of the async model?