I have the following code snipped that creates a TCPServer, and attaches a ChannelHandler to the channel in the doOnChannelInit()
function. The server is to process byte data from an embedded device.
@Component
@RequiredArgsConstructor
public class NettyServer {
Logger log = LoggerFactory.getLogger(NettyServer.class);
private final NettyProperties nettyProperties;
private final NettyServerHandler nettyServerHandler;
private TcpServer server;
public void run() {
server = TcpServer
.create()
.host("localhost")
.port(nettyProperties.getTcpPort())
.doOnChannelInit((connectionObserver, channel, remoteAddress) -> {
log.info("Connection from " + remoteAddress);
channel.pipeline()
.addLast("idleStateHandler", new IdleStateHandler(0, 0, 4, TimeUnit.MINUTES))
.addLast(new ByteArrayDecoder())
.addLast(new ByteArrayEncoder())
.addLast(nettyServerHandler);
});
server.bindNow();
log.info("Server running");
}
}
Channel handler
@Component
@RequiredArgsConstructor
@ChannelHandler.Sharable
public class NettyServerHandler extends SimpleChannelInboundHandler<byte[]> {
Logger log = LoggerFactory.getLogger(NettyServerHandler.class);
private final AttributeKey<byte[]> dataKey = AttributeKey.valueOf("dataBuf");
private final AttributeKey<Integer> dataLen = AttributeKey.valueOf("dataBufLen");
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("New Meter connection from : " + ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel() != null) {
log.info(String.format("Meter/Client Disconnected. No: %s ; Channel : %s", meterNo, ctx.channel()));
}
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, byte[] msg) throws Exception {
log.info("Message received: " + new String(msg);
ctx.channel().read();
}
}
I'm able to connect to the server, but when i send a message, nothing happens, the log statements are not triggered.
I'm not sure what I'm missing here, would appreciate some help.
Thanks
There is no need to add custom handlers to the Netty pipeline. The example above can be written like this:
@Component
@RequiredArgsConstructor
public class NettyServer {
Logger log = LoggerFactory.getLogger(NettyServer.class);
private final NettyProperties nettyProperties;
private TcpServer server;
public void run() {
server = TcpServer
.create()
.host("localhost")
.port(nettyProperties.getTcpPort())
.doOnChannelInit((connectionObserver, channel, remoteAddress) -> {
log.info("Connection from " + remoteAddress);
channel.pipeline()
.addLast("idleStateHandler", new IdleStateHandler(0, 0, 4, TimeUnit.MINUTES));
})
.handle((in, out) ->
in.receive()
.asString()
.doOnNext(s -> log.info("Message received: " + s))
.then());
server.bindNow();
log.info("Server running");
}
}
Consider checking the Reference Documentation
The incoming data can be transformed to String
with (asString
), to byte[]
with (asByteArray
) etc. If there is no suitable transformation you can use map(byteBuf -> ...)
and transform the ByteBuf
to the needed abstraction.