3

I am reading "Netty In Action V5". When reading to chapter 2.3 and 2.4, I tried with example EchoServer and EchoClient, when I tested one client connected to server, everything worked perfectly ... then I modified the example to multi clients could connect to server. My purpose was to run a stresstest : 1000 clients would connect to server, and each of client would echo 100 messages to server, and when all clients finished, I would get total time of all of process. Server was deployed on linux machine (VPS), and clients were deployed on window machine.

When run stresstest, I got 2 problems:

Some clients got error message:

java.io.IOException: An existing connection was forcibly closed by the remote host 
    at sun.nio.ch.SocketDispatcher.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
    at io.netty.buffer.UnpooledUnsafeDirectByteBuf.setBytes(UnpooledUnsafeDirectByteBuf.java:447)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)\at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:110)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
    at java.lang.Thread.run(Thread.java:745)

But some clients did not received message from server

Working Enviroment:

  • Netty-all-4.0.30.Final

  • JDK1.8.0_25

  • Echo Clients were deployed on Window 7 Ultimate

  • Echo Server was deployed on Linux Centos 6

Class NettyClient:

public class NettyClient {
    private Bootstrap bootstrap;
    private EventLoopGroup group;

    public NettyClient(final ChannelInboundHandlerAdapter handler) {
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(group);
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel channel) throws Exception {
                channel.pipeline().addLast(handler);
            }
        });
    }

    public void start(String host, int port) throws Exception {
        bootstrap.remoteAddress(new InetSocketAddress(host, port));
        bootstrap.connect();
    }

    public void stop() {
        try {
            group.shutdownGracefully().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Class NettyServer:

public class NettyServer {
    private EventLoopGroup parentGroup;
    private EventLoopGroup childGroup;
    private ServerBootstrap boopstrap;

    public NettyServer(final ChannelInboundHandlerAdapter handler) {
        parentGroup = new NioEventLoopGroup(300);
        childGroup = new NioEventLoopGroup(300);
        boopstrap = new ServerBootstrap();
        boopstrap.group(parentGroup, childGroup);
        boopstrap.channel(NioServerSocketChannel.class);
        boopstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel channel) throws Exception {
                channel.pipeline().addLast(handler);
            }
        });
    }

    public void start(int port) throws Exception {
        boopstrap.localAddress(new InetSocketAddress(port));
        ChannelFuture future = boopstrap.bind().sync();
        System.err.println("Start Netty server on port " + port);
        future.channel().closeFuture().sync();
    }

    public void stop() throws Exception {
        parentGroup.shutdownGracefully().sync();
        childGroup.shutdownGracefully().sync();
    }
}

Class EchoClient

public class EchoClient {
    private static final String HOST = "203.12.37.22";
    private static final int PORT = 3344;
    private static final int NUMBER_CONNECTION = 1000;
    private static final int NUMBER_ECHO = 10;
    private static CountDownLatch counter = new CountDownLatch(NUMBER_CONNECTION);

    public static void main(String[] args) throws Exception {
        List<NettyClient> listClients = Collections.synchronizedList(new ArrayList<NettyClient>());
        for (int i = 0; i < NUMBER_CONNECTION; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        NettyClient client = new NettyClient(new EchoClientHandler(NUMBER_ECHO) {
                            @Override
                            protected void onFinishEcho() {
                                counter.countDown();
                                System.err.println((NUMBER_CONNECTION - counter.getCount()) + "/" + NUMBER_CONNECTION);
                            }
                        });
                        client.start(HOST, PORT);
                        listClients.add(client);
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
            }).start();
        }

        long t1 = System.currentTimeMillis();
        counter.await();
        long t2 = System.currentTimeMillis();
        System.err.println("Totla time: " + (t2 - t1));

        for (NettyClient client : listClients) {
            client.stop();
        }
    }

    private static class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

        private static final String ECHO_MSG = "Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo";
        private int numberEcho;
        private int curNumberEcho = 0;

        public EchoClientHandler(int numberEcho) {
            this.numberEcho = numberEcho;
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_MSG, CharsetUtil.UTF_8));
        }

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
            curNumberEcho++;
            if (curNumberEcho >= numberEcho) {
                onFinishEcho();
            } else {
                ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_MSG, CharsetUtil.UTF_8));
            }
        }

        protected void onFinishEcho() {

        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
}

Class EchoServer:

public class EchoServer {
    private static final int PORT = 3344;

    public static void main(String[] args) throws Exception {
        NettyServer server = new NettyServer(new EchoServerHandler());
        server.start(PORT);
        System.err.println("Start server on port " + PORT);
    }

    @Sharable
    private static class EchoServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ctx.write(msg);
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
}
Hong Phat
  • 41
  • 1
  • 2
  • As I did not have access to those examples, could you print the code and in particular how you manage the pipeline and the close after 100 iterations for each client? I myself refer to the example Echo from Netty github directly which is made for 1 client at a time as is... – Frederic Brégier Aug 07 '15 at 15:59
  • A search turned up this question: http://stackoverflow.com/questions/2582036/an-existing-connection-was-forcibly-closed-by-the-remote-host?rq=1. It is probably malformed data being sent and triggering a corner case by some of your clients. – laughing_man Aug 08 '15 at 03:13
  • If the echo handlers look like the ones in Netty within, the format of the data has no importance. But the resources might be exhausted, so the reason I asked for the pipeline configuration... – Frederic Brégier Aug 08 '15 at 08:28
  • I have updated my example, help me check it out, many thanks – Hong Phat Aug 09 '15 at 10:38

1 Answers1

1

You might change 2 things:

  1. Create only one client bootstrap and reuse it for all your clients instead of creating one per client. So extract your bootstrap build out of the Client part and keep only the connect as you've done in your start. This will limit the number of threads internally.

  2. Close the connection on client side when the number of ping pong is reached. Currently you do only a call to the empty method onFinishEcho, which causes no close at all on client side, so there is no client stopping... And therefore no channel closing too...

You might have reach some limitations on the number of threads on client side.

Also one other element could be an issue: you don't specify any codec (string codec or whatever) which could lead to partial sending from client or server treated as full response however.

For instance you might have a first block of "Echo Echo Echo" sending one packet containing the beginning of your buffer, while the other parts (more "Echo") Will be send through later packets.

To prevent this, you should use one codec to ensure your final handler is getting a real full message, not partial one. If not, you might fall in other issues such as error on the server side trying to send extra packet while the channel would be closed by the client sooner as expected...

Frederic Brégier
  • 2,108
  • 1
  • 18
  • 25