javatcpnettyproject-reactorreactor-netty

Netty TCP Server keep connection open on custom exceptions


I wrote a TCP Server using project reactor netty that receives a byte array request message from a client, processes it, then returns a byte array response message to the client. The connections between server/client are for the most part long lived unless some kind of fatal error occurs.

I have a use case where if my server throws certain exceptions while processing the client's request, I need to keep the connection open, but not send any data back to the client. Currently, when those exceptions get thrown, Netty is automatically closing the connection. I tried adding an exceptionCaught(...) in my custom handler but it seems to never be reaching it. Setting the ChannelOption.AUTO_CLOSE flag to false also does not work because that seems to only apply for exceptions thrown during a write(). In my use case, we are never writing any data back to the client.

Below is a workaround I did to ensure the exceptionCaught() method gets fired so I can appropriately handle the exceptions and keep the connection open:

    DisposableServer someTcpServer = tcpServer
            .host("12.123.456.789")
            .port(12345)
            .wiretap(true)
            .doOnBind(server -> log.info("Starting listener..."))
            .doOnBound(server -> log.info("Listener started on host: {}, port: {}", server.host(), server.port()))
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childOption(ChannelOption.AUTO_CLOSE,false)
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            .doOnConnection(connection -> {
                InetSocketAddress socketAddress = (InetSocketAddress) connection.channel().remoteAddress();
                log.info("Client has connected. Host: {}, Port: {}",
                            socketAddress.getAddress().getHostAddress(), socketAddress.getPort());
            })
            .doOnChannelInit((observer, channel, remoteAddress) ->
                    channel.pipeline()
                            .addFirst(new TcpServerHandler())
            )
            .handle((inbound, outbound) ->
                    inbound
                            .receive()
                            .asByteArray()
                            .flatMap(req -> processRequest(req))
                            .flatMap(rsp -> outbound.sendByteArray(Flux.just(rsp))
                            //any exceptions thrown during the above processRequest()...
                            .onErrorResume(throwable -> {
                                //...will get handled here
                                inbound.withConnection(connection -> connection.channel().pipeline().fireExceptionCaught(throwable));
                                return Mono.empty();
                            })
            ).bindNow();
    someTcpServer.onDispose().block();
}

And then in my custom TCPServerHandler class I handle the custom exceptions in the exceptionCaught(...) as shown below.

@Slf4j
public class TcpServerHandler extends ChannelDuplexHandler {

    private final AtomicLong startTime = new AtomicLong(0L);
    private final AtomicLong endTime = new AtomicLong(0L);

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof BusinessLogicException1 ||
                cause instanceof BusinessLogicException2) {     
            endTime.set(System.nanoTime());
            log.info("Took {} ms to process", Duration.ofNanos(endTime.get() - startTime.get()).toMillis()))
            //for these specific exceptions, keep the connection open
            ctx.fireChannelActive();
        } else {
            //When an exception that wasn't one of the above^ was thrown
            //I had super.exceptionCaught(...) here and this was causing my
            //exceptionCaught(...) method to be called twice, so I removed the
            //call to super.exceptionCaught(...) and just don't do anything.
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        startTime.set(System.nanoTime());
        ctx.fireChannelRead(msg);
    }
}

This seems to be working, and even without the explicit call the super.exceptionCaught(...), I can see that Netty is properly closing the connection when any exception besides the ones I specified are thrown. Just wanted to know if this is the correct approach or if there is a better way to achieve this as I am still new to Netty.


Solution

  • You can try publish() + autoConnect() + retry()

                .handle((inbound, outbound) ->
                    inbound
                            .receive()
                            .asByteArray()
                            .publish()
                            .autoConnect()
                            .flatMap(req -> processRequest(req))
                            .flatMap(rsp -> outbound.sendByteArray(Flux.just(rsp))
                            .retry()
            )
    

    More about those you can find here