nettynetty4

netty Channel is connecting to remote servers but not localhost/127.0.0.1


I'm writing code in netty that should connect to a remote server.

Below is my complete code.


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.logging.LogLevel;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.example.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyConnectionDemo {

    protected final Class<? extends SocketChannel> channelClass;
    protected final EventLoopGroup ioWorkers;

    protected LogLevel level;

    Logger log = LoggerFactory.getLogger(NettyConnectionDemo.class);
    protected long connectTimeOutMills = TimeUnit.SECONDS.toMillis(3);

    public NettyConnectionDemo(Class<? extends SocketChannel> channelClass, EventLoopGroup ioWorkers) {
        this.channelClass = channelClass;
        this.ioWorkers = ioWorkers;
        init();
    }

    public NettyConnectionDemo() {
        this(0);
    }

    public NettyConnectionDemo(int ioThreads) {
        if (ioThreads < 0) {
            ioThreads = 0;
        }
        EventLoopGroup workerGroup;
        Class<? extends SocketChannel> channelClass;
        if (Epoll.isAvailable()) {
            channelClass = EpollSocketChannel.class;
            workerGroup = new EpollEventLoopGroup(ioThreads, new NamedThreadFactory("ClientConfig-ioWorkers", true));
        } else {
            channelClass = NioSocketChannel.class;
            workerGroup = new NioEventLoopGroup(ioThreads, new NamedThreadFactory("ClientConfig-ioWorkers", true));
        }
        this.channelClass = channelClass;
        this.ioWorkers = workerGroup;
        init();
    }

    public Class<? extends SocketChannel> getChannelClass() {
        return channelClass;
    }

    public EventLoopGroup getIoWorkers() {
        return ioWorkers;
    }

    public LogLevel getLevel() {
        return level;
    }

    public void setLevel(LogLevel level) {
        this.level = level;
    }

    public long getConnectTimeOutMills() {
        return connectTimeOutMills;
    }

    public void setConnectTimeOutMills(long connectTimeOutMills) {
        this.connectTimeOutMills = connectTimeOutMills;
    }

    public void destory() {
        if (ioWorkers != null) {
            ioWorkers.shutdownGracefully();
        }
    }

    protected final Bootstrap booter = new Bootstrap();

    @ChannelHandler.Sharable
    class ShareableChannelInboundHandler extends ChannelInboundHandlerAdapter {}

    Bootstrap getBooter() {
        return booter;
    }

    private void init() {
        booter.group(ioWorkers);
        booter.channel(channelClass);
    }

    protected void initBooterOptions() {
        booter.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000);
    }

    protected ChannelHandler initHandlerAdapter(ChannelHandler init, Consumer<ChannelHandlerContext> closeListener) {
        ChannelHandler handler = new ShareableChannelInboundHandler() {
            @Override
            public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                Channel ch = ctx.channel();
                LogLevel level = getLevel();
                ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                    @Override
                    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                        log.info("channelRegistered:{}", ctx.channel());
                        super.channelRegistered(ctx);
                    }

                    @Override
                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                        log.info("channelActive:{}", ctx.channel());
                        super.channelActive(ctx);
                    }

                    @Override
                    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                        log.info("channelInactive:{}", ctx.channel());
                        if (closeListener != null) {
                            try {
                                closeListener.accept(ctx);
                            } catch (Throwable e) {
                                log.error(e.getMessage(), e);
                            }
                        }
                        super.channelInactive(ctx);
                    }

                    @Override
                    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
                        log.info("channelUnregistered:{}", ctx.channel());
                        super.channelUnregistered(ctx);
                    }
                });
                
                ch.pipeline().addLast(new HttpRequestEncoder());
                ch.pipeline().addLast(init);
                ctx.pipeline().remove(this);
                ctx.fireChannelRegistered();
            }
        };
        return handler;
    }

    protected ChannelFuture doBooterConnect(InetSocketAddress address, final ChannelHandler init, Consumer<ChannelHandlerContext> closeListener) {
        ChannelFuture cf;
        synchronized (booter) {
            ChannelHandler handler = initHandlerAdapter(init, closeListener);
            booter.handler(handler);
            cf = booter.connect(address);
        }
        return cf;
    }

    public final ChannelFuture connect(InetSocketAddress address) {
        return doBooterConnect(address, null, null);
    }

    public final ChannelFuture connect(InetSocketAddress address, ChannelHandler handler) {
        return doBooterConnect(address, handler, null);
    }

    public static void main(String[] args) throws InterruptedException {
        NettyConnectionDemo cb = new NettyConnectionDemo(NioSocketChannel.class, new NioEventLoopGroup());
        ChannelFuture cf = cb.connect(new InetSocketAddress("google.com", 80)).syncUninterruptibly();
        System.out.println(cf.channel());
    }
}

This code is working fine when I connect to remote machines like google.com, yahoo.com etc.

When I change the code like below to connect to locally running http servers, the code is throwing exception. I'm able to do REST call in curl it is working fine.

ChannelFuture cf = cb.connect(new InetSocketAddress("localhost", 3000)).syncUninterruptibly();
ChannelFuture cf = cb.connect(new InetSocketAddress("127.0.0.1", 3000)).syncUninterruptibly();

Both of them are not working.

The exception I'm getting is -

SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details. Exception in thread "main" io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:3000 Caused by: java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:337) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:776) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:750)

How can I make it work with local server as well?


Solution

  • When you connect using netty, it uses the NIO system library to connect to an endpoint, it does not use any other smart logic like curl or your browser does.

    The issue here is that the server that you have created using node is running on localhost. Node resolves domain names according to the system priority, so the first result of localhost becomes the IPv6 ::1.

    When you are running your java process, you are not starting it using the -Djava.net.preferIPv6Addresses=true flag, so Java runs in a compatibility mode for older programs and always resolves any domain to IPv4 first. For localhost, this is 127.0.0.1, since your server is not running on that ip and it only tries to connect to a single DNS record, it doesn't work.

    Example of this going wrong:

    $ netstat -nlpa
    Active Internet connections (servers and established)
    Proto Recv-Q Send-Q Local Address   Foreign Address  State       PID/Program name
    tcp6       0      0 ::1:3000        :::*             LISTEN      11684/node
    
    $ jshell
    jshell> java.net.InetAddress.getByName("localhost");
    $1 ==> localhost/127.0.0.1
    
    $ jshell -R-Djava.net.preferIPv6Addresses=true -J-Djava.net.preferIPv6Addresses=true
    jshell> java.net.InetAddress.getByName("localhost");
    $1 ==> localhost/0:0:0:0:0:0:0:1
    

    To fix, use either solution: