0

I'm writing code in netty that should connect to a remote server.

Below is my complete code.


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.logging.LogLevel;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.example.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyConnectionDemo {

    protected final Class<? extends SocketChannel> channelClass;
    protected final EventLoopGroup ioWorkers;

    protected LogLevel level;

    Logger log = LoggerFactory.getLogger(NettyConnectionDemo.class);
    protected long connectTimeOutMills = TimeUnit.SECONDS.toMillis(3);

    public NettyConnectionDemo(Class<? extends SocketChannel> channelClass, EventLoopGroup ioWorkers) {
        this.channelClass = channelClass;
        this.ioWorkers = ioWorkers;
        init();
    }

    public NettyConnectionDemo() {
        this(0);
    }

    public NettyConnectionDemo(int ioThreads) {
        if (ioThreads < 0) {
            ioThreads = 0;
        }
        EventLoopGroup workerGroup;
        Class<? extends SocketChannel> channelClass;
        if (Epoll.isAvailable()) {
            channelClass = EpollSocketChannel.class;
            workerGroup = new EpollEventLoopGroup(ioThreads, new NamedThreadFactory("ClientConfig-ioWorkers", true));
        } else {
            channelClass = NioSocketChannel.class;
            workerGroup = new NioEventLoopGroup(ioThreads, new NamedThreadFactory("ClientConfig-ioWorkers", true));
        }
        this.channelClass = channelClass;
        this.ioWorkers = workerGroup;
        init();
    }

    public Class<? extends SocketChannel> getChannelClass() {
        return channelClass;
    }

    public EventLoopGroup getIoWorkers() {
        return ioWorkers;
    }

    public LogLevel getLevel() {
        return level;
    }

    public void setLevel(LogLevel level) {
        this.level = level;
    }

    public long getConnectTimeOutMills() {
        return connectTimeOutMills;
    }

    public void setConnectTimeOutMills(long connectTimeOutMills) {
        this.connectTimeOutMills = connectTimeOutMills;
    }

    public void destory() {
        if (ioWorkers != null) {
            ioWorkers.shutdownGracefully();
        }
    }

    protected final Bootstrap booter = new Bootstrap();

    @ChannelHandler.Sharable
    class ShareableChannelInboundHandler extends ChannelInboundHandlerAdapter {}

    Bootstrap getBooter() {
        return booter;
    }

    private void init() {
        booter.group(ioWorkers);
        booter.channel(channelClass);
    }

    protected void initBooterOptions() {
        booter.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000);
    }

    protected ChannelHandler initHandlerAdapter(ChannelHandler init, Consumer<ChannelHandlerContext> closeListener) {
        ChannelHandler handler = new ShareableChannelInboundHandler() {
            @Override
            public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                Channel ch = ctx.channel();
                LogLevel level = getLevel();
                ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                    @Override
                    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                        log.info("channelRegistered:{}", ctx.channel());
                        super.channelRegistered(ctx);
                    }

                    @Override
                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                        log.info("channelActive:{}", ctx.channel());
                        super.channelActive(ctx);
                    }

                    @Override
                    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                        log.info("channelInactive:{}", ctx.channel());
                        if (closeListener != null) {
                            try {
                                closeListener.accept(ctx);
                            } catch (Throwable e) {
                                log.error(e.getMessage(), e);
                            }
                        }
                        super.channelInactive(ctx);
                    }

                    @Override
                    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
                        log.info("channelUnregistered:{}", ctx.channel());
                        super.channelUnregistered(ctx);
                    }
                });
                
                ch.pipeline().addLast(new HttpRequestEncoder());
                ch.pipeline().addLast(init);
                ctx.pipeline().remove(this);
                ctx.fireChannelRegistered();
            }
        };
        return handler;
    }

    protected ChannelFuture doBooterConnect(InetSocketAddress address, final ChannelHandler init, Consumer<ChannelHandlerContext> closeListener) {
        ChannelFuture cf;
        synchronized (booter) {
            ChannelHandler handler = initHandlerAdapter(init, closeListener);
            booter.handler(handler);
            cf = booter.connect(address);
        }
        return cf;
    }

    public final ChannelFuture connect(InetSocketAddress address) {
        return doBooterConnect(address, null, null);
    }

    public final ChannelFuture connect(InetSocketAddress address, ChannelHandler handler) {
        return doBooterConnect(address, handler, null);
    }

    public static void main(String[] args) throws InterruptedException {
        NettyConnectionDemo cb = new NettyConnectionDemo(NioSocketChannel.class, new NioEventLoopGroup());
        ChannelFuture cf = cb.connect(new InetSocketAddress("google.com", 80)).syncUninterruptibly();
        System.out.println(cf.channel());
    }
}

This code is working fine when I connect to remote machines like google.com, yahoo.com etc.

When I change the code like below to connect to locally running http servers, the code is throwing exception. I'm able to do REST call in curl it is working fine.

ChannelFuture cf = cb.connect(new InetSocketAddress("localhost", 3000)).syncUninterruptibly();
ChannelFuture cf = cb.connect(new InetSocketAddress("127.0.0.1", 3000)).syncUninterruptibly();

Both of them are not working.

The exception I'm getting is -

SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details. Exception in thread "main" io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:3000 Caused by: java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:337) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:776) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:750)

How can I make it work with local server as well?

user51
  • 8,843
  • 21
  • 79
  • 158
  • Are you able to bring up `http://localhost:3000` in your browser? What process is running your server? – Tim Roberts Aug 13 '23 at 00:31
  • TimRoberts Yes. I'm able to connect to http://localhost:3000. It is just a docusaurus website. You can run it using `npx create-docusaurus@latest my-website classic --typescript` . See here for more details https://docusaurus.io/docs/installation. – user51 Aug 13 '23 at 00:40
  • Your exception has the URL as `localhost/127.0.0.1:3000`, which is wrong. – Tim Roberts Aug 13 '23 at 00:52
  • 1
    Thats from the log serialized by netty. actual input is "127.0.0.1" – user51 Aug 13 '23 at 00:54

1 Answers1

1

When you connect using netty, it uses the NIO system library to connect to an endpoint, it does not use any other smart logic like curl or your browser does.

The issue here is that the server that you have created using node is running on localhost. Node resolves domain names according to the system priority, so the first result of localhost becomes the IPv6 ::1.

When you are running your java process, you are not starting it using the -Djava.net.preferIPv6Addresses=true flag, so Java runs in a compatibility mode for older programs and always resolves any domain to IPv4 first. For localhost, this is 127.0.0.1, since your server is not running on that ip and it only tries to connect to a single DNS record, it doesn't work.

Example of this going wrong:

$ netstat -nlpa
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address   Foreign Address  State       PID/Program name
tcp6       0      0 ::1:3000        :::*             LISTEN      11684/node

$ jshell
jshell> java.net.InetAddress.getByName("localhost");
$1 ==> localhost/127.0.0.1

$ jshell -R-Djava.net.preferIPv6Addresses=true -J-Djava.net.preferIPv6Addresses=true
jshell> java.net.InetAddress.getByName("localhost");
$1 ==> localhost/0:0:0:0:0:0:0:1

To fix, use either solution:

  • Always pass the flag -Djava.net.preferIPv6Addresses=true when starting your program to disable backwards compatibility
  • Resolve an domain to all IP addresses and try them in order until a timeout is reached
  • Resolve an domain to all IP addresses and connect to them at the same time with some delay per new attempt, pick the first working one (the Happy eyeballs algorithm, uses by Curl and Chromium. Curl defaults to 200ms)
Ferrybig
  • 18,194
  • 6
  • 57
  • 79
  • I tried adding `-Djava.net.preferIPv6Addresses=true` to java. It still not working. But this time I noticed I'm not getting connection refused but it hangs. Any idea? – user51 Aug 16 '23 at 21:17
  • using `NetUtil.LOCALHOST6` for hostname fixed the problem. Thans @Ferrybig – user51 Aug 17 '23 at 20:59