I'm writing code in netty that should connect to a remote server.
Below is my complete code.
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.logging.LogLevel;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.example.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NettyConnectionDemo {
protected final Class<? extends SocketChannel> channelClass;
protected final EventLoopGroup ioWorkers;
protected LogLevel level;
Logger log = LoggerFactory.getLogger(NettyConnectionDemo.class);
protected long connectTimeOutMills = TimeUnit.SECONDS.toMillis(3);
public NettyConnectionDemo(Class<? extends SocketChannel> channelClass, EventLoopGroup ioWorkers) {
this.channelClass = channelClass;
this.ioWorkers = ioWorkers;
init();
}
public NettyConnectionDemo() {
this(0);
}
public NettyConnectionDemo(int ioThreads) {
if (ioThreads < 0) {
ioThreads = 0;
}
EventLoopGroup workerGroup;
Class<? extends SocketChannel> channelClass;
if (Epoll.isAvailable()) {
channelClass = EpollSocketChannel.class;
workerGroup = new EpollEventLoopGroup(ioThreads, new NamedThreadFactory("ClientConfig-ioWorkers", true));
} else {
channelClass = NioSocketChannel.class;
workerGroup = new NioEventLoopGroup(ioThreads, new NamedThreadFactory("ClientConfig-ioWorkers", true));
}
this.channelClass = channelClass;
this.ioWorkers = workerGroup;
init();
}
public Class<? extends SocketChannel> getChannelClass() {
return channelClass;
}
public EventLoopGroup getIoWorkers() {
return ioWorkers;
}
public LogLevel getLevel() {
return level;
}
public void setLevel(LogLevel level) {
this.level = level;
}
public long getConnectTimeOutMills() {
return connectTimeOutMills;
}
public void setConnectTimeOutMills(long connectTimeOutMills) {
this.connectTimeOutMills = connectTimeOutMills;
}
public void destory() {
if (ioWorkers != null) {
ioWorkers.shutdownGracefully();
}
}
protected final Bootstrap booter = new Bootstrap();
@ChannelHandler.Sharable
class ShareableChannelInboundHandler extends ChannelInboundHandlerAdapter {}
Bootstrap getBooter() {
return booter;
}
private void init() {
booter.group(ioWorkers);
booter.channel(channelClass);
}
protected void initBooterOptions() {
booter.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000);
}
protected ChannelHandler initHandlerAdapter(ChannelHandler init, Consumer<ChannelHandlerContext> closeListener) {
ChannelHandler handler = new ShareableChannelInboundHandler() {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
Channel ch = ctx.channel();
LogLevel level = getLevel();
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
log.info("channelRegistered:{}", ctx.channel());
super.channelRegistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("channelActive:{}", ctx.channel());
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("channelInactive:{}", ctx.channel());
if (closeListener != null) {
try {
closeListener.accept(ctx);
} catch (Throwable e) {
log.error(e.getMessage(), e);
}
}
super.channelInactive(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
log.info("channelUnregistered:{}", ctx.channel());
super.channelUnregistered(ctx);
}
});
ch.pipeline().addLast(new HttpRequestEncoder());
ch.pipeline().addLast(init);
ctx.pipeline().remove(this);
ctx.fireChannelRegistered();
}
};
return handler;
}
protected ChannelFuture doBooterConnect(InetSocketAddress address, final ChannelHandler init, Consumer<ChannelHandlerContext> closeListener) {
ChannelFuture cf;
synchronized (booter) {
ChannelHandler handler = initHandlerAdapter(init, closeListener);
booter.handler(handler);
cf = booter.connect(address);
}
return cf;
}
public final ChannelFuture connect(InetSocketAddress address) {
return doBooterConnect(address, null, null);
}
public final ChannelFuture connect(InetSocketAddress address, ChannelHandler handler) {
return doBooterConnect(address, handler, null);
}
public static void main(String[] args) throws InterruptedException {
NettyConnectionDemo cb = new NettyConnectionDemo(NioSocketChannel.class, new NioEventLoopGroup());
ChannelFuture cf = cb.connect(new InetSocketAddress("google.com", 80)).syncUninterruptibly();
System.out.println(cf.channel());
}
}
This code is working fine when I connect to remote machines like google.com, yahoo.com etc.
When I change the code like below to connect to locally running http servers, the code is throwing exception. I'm able to do REST call in curl it is working fine.
ChannelFuture cf = cb.connect(new InetSocketAddress("localhost", 3000)).syncUninterruptibly();
ChannelFuture cf = cb.connect(new InetSocketAddress("127.0.0.1", 3000)).syncUninterruptibly();
Both of them are not working.
The exception I'm getting is -
SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details. Exception in thread "main" io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:3000 Caused by: java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:337) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:776) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:750)
How can I make it work with local server as well?