project-reactorreactor-netty

How to add Channel Handler to TCPServer on Channel initialization


I have the following code snipped that creates a TCPServer, and attaches a ChannelHandler to the channel in the doOnChannelInit() function. The server is to process byte data from an embedded device.

@Component
@RequiredArgsConstructor
public class NettyServer {

    Logger log = LoggerFactory.getLogger(NettyServer.class);

    private final NettyProperties nettyProperties;

    private final NettyServerHandler nettyServerHandler;

    private TcpServer server;

    public void run() {
        server = TcpServer
                .create()
                .host("localhost")
                .port(nettyProperties.getTcpPort())
                .doOnChannelInit((connectionObserver, channel, remoteAddress) -> {
                    log.info("Connection from " + remoteAddress);
                    channel.pipeline()
                            .addLast("idleStateHandler", new IdleStateHandler(0, 0, 4, TimeUnit.MINUTES))
                            .addLast(new ByteArrayDecoder())
                            .addLast(new ByteArrayEncoder())
                            .addLast(nettyServerHandler);
                });

        server.bindNow();
        log.info("Server running");
    }

}

Channel handler

@Component
@RequiredArgsConstructor
@ChannelHandler.Sharable
public class NettyServerHandler extends SimpleChannelInboundHandler<byte[]> {

    Logger log = LoggerFactory.getLogger(NettyServerHandler.class);

    private final AttributeKey<byte[]> dataKey = AttributeKey.valueOf("dataBuf");

    private final AttributeKey<Integer> dataLen = AttributeKey.valueOf("dataBufLen");


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("New Meter connection from : " + ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel() != null) {
            log.info(String.format("Meter/Client Disconnected. No: %s ; Channel : %s", meterNo, ctx.channel()));

        }
        ctx.close();

    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, byte[] msg) throws Exception {

        log.info("Message received: " + new String(msg);
        ctx.channel().read();
    }
}

I'm able to connect to the server, but when i send a message, nothing happens, the log statements are not triggered.

I'm not sure what I'm missing here, would appreciate some help.

Thanks


Solution

  • There is no need to add custom handlers to the Netty pipeline. The example above can be written like this:

        @Component
        @RequiredArgsConstructor
        public class NettyServer {
    
            Logger log = LoggerFactory.getLogger(NettyServer.class);
    
            private final NettyProperties nettyProperties;
    
            private TcpServer server;
    
            public void run() {
                server = TcpServer
                        .create()
                        .host("localhost")
                        .port(nettyProperties.getTcpPort())
                        .doOnChannelInit((connectionObserver, channel, remoteAddress) -> {
                            log.info("Connection from " + remoteAddress);
                            channel.pipeline()
                                    .addLast("idleStateHandler", new IdleStateHandler(0, 0, 4, TimeUnit.MINUTES));
                        })
                        .handle((in, out) ->
                                in.receive()
                                        .asString()
                                        .doOnNext(s -> log.info("Message received: " + s))
                                        .then());
    
                server.bindNow();
                log.info("Server running");
            }
        }
    

    Consider checking the Reference Documentation The incoming data can be transformed to String with (asString), to byte[] with (asByteArray) etc. If there is no suitable transformation you can use map(byteBuf -> ...) and transform the ByteBuf to the needed abstraction.