I wrote a TCP Server using project reactor netty that receives a byte array request message from a client, processes it, then returns a byte array response message to the client. The connections between server/client are for the most part long lived unless some kind of fatal error occurs.
I have a use case where if my server throws certain exceptions while processing the client's request, I need to keep the connection open, but not send any data back to the client. Currently, when those exceptions get thrown, Netty is automatically closing the connection. I tried adding an exceptionCaught(...) in my custom handler but it seems to never be reaching it. Setting the ChannelOption.AUTO_CLOSE flag to false also does not work because that seems to only apply for exceptions thrown during a write(). In my use case, we are never writing any data back to the client.
Below is a workaround I did to ensure the exceptionCaught() method gets fired so I can appropriately handle the exceptions and keep the connection open:
DisposableServer someTcpServer = tcpServer
.host("12.123.456.789")
.port(12345)
.wiretap(true)
.doOnBind(server -> log.info("Starting listener..."))
.doOnBound(server -> log.info("Listener started on host: {}, port: {}", server.host(), server.port()))
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.AUTO_CLOSE,false)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.doOnConnection(connection -> {
InetSocketAddress socketAddress = (InetSocketAddress) connection.channel().remoteAddress();
log.info("Client has connected. Host: {}, Port: {}",
socketAddress.getAddress().getHostAddress(), socketAddress.getPort());
})
.doOnChannelInit((observer, channel, remoteAddress) ->
channel.pipeline()
.addFirst(new TcpServerHandler())
)
.handle((inbound, outbound) ->
inbound
.receive()
.asByteArray()
.flatMap(req -> processRequest(req))
.flatMap(rsp -> outbound.sendByteArray(Flux.just(rsp))
//any exceptions thrown during the above processRequest()...
.onErrorResume(throwable -> {
//...will get handled here
inbound.withConnection(connection -> connection.channel().pipeline().fireExceptionCaught(throwable));
return Mono.empty();
})
).bindNow();
someTcpServer.onDispose().block();
}
And then in my custom TCPServerHandler class I handle the custom exceptions in the exceptionCaught(...) as shown below.
@Slf4j
public class TcpServerHandler extends ChannelDuplexHandler {
private final AtomicLong startTime = new AtomicLong(0L);
private final AtomicLong endTime = new AtomicLong(0L);
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof BusinessLogicException1 ||
cause instanceof BusinessLogicException2) {
endTime.set(System.nanoTime());
log.info("Took {} ms to process", Duration.ofNanos(endTime.get() - startTime.get()).toMillis()))
//for these specific exceptions, keep the connection open
ctx.fireChannelActive();
} else {
//When an exception that wasn't one of the above^ was thrown
//I had super.exceptionCaught(...) here and this was causing my
//exceptionCaught(...) method to be called twice, so I removed the
//call to super.exceptionCaught(...) and just don't do anything.
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
startTime.set(System.nanoTime());
ctx.fireChannelRead(msg);
}
}
This seems to be working, and even without the explicit call the super.exceptionCaught(...), I can see that Netty is properly closing the connection when any exception besides the ones I specified are thrown. Just wanted to know if this is the correct approach or if there is a better way to achieve this as I am still new to Netty.
You can try publish()
+ autoConnect()
+ retry()
.handle((inbound, outbound) ->
inbound
.receive()
.asByteArray()
.publish()
.autoConnect()
.flatMap(req -> processRequest(req))
.flatMap(rsp -> outbound.sendByteArray(Flux.just(rsp))
.retry()
)
More about those you can find here