I have a Netty client with connect && auto reconnect implemented in the following manner. The code has been working fine for many years.
Bootstrap initialisation
Bootstrap bootstrap = new Bootstrap;
bootstrap.group(new NioEventLoopGroup(NUM_OF_WORKER_THREADS, new NamedThreadFactory(client.hostname+"-%d")))
bootstrap.channel(NioSocketChannel.class)
.handler(new MyChannelInitializer(sslContext, client))
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
Actual Connection logic
public void connect() {
try {
ChannelFuture cf = bootstrap.connect(hostname, port).sync().await();
} catch (Exception e) {
if (group != null && !group.isShutdown()) {
logger.error("Shutting down Event loop group: {}, host: {}" , group, hostname);
group.shutdownGracefully();
}
throw new Exception("Connection failed to " + hostname, e);
}
}
ReConnect
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
ctx.channel().close();
//Connect again
connect();
}
The reconnect trigger is hooked in channel unregistered callback. Recently observed a strange issue where connection went into close_wait state and thread is stuck forever. Why connect timeout (3 sec) is not triggered?
Here is the thread dump for the specific callback processing.
I suspect the root cause here is due to invoking blocking methods (sync and await) in callback handler threads. is there any deadlock?
"server-callback-worker-1" #214 prio=5 os_prio=0 cpu=1322.64ms elapsed=83237.37s tid=0x00007f1b0c005800 nid=0x1a9e89 in Object.wait() [0x00007f1b5c1fd000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(java.base@11.0.22/Native Method)
- waiting on <no object reference available>
at java.lang.Object.wait(java.base@11.0.22/Object.java:328)
at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:254)
- waiting to re-lock in wait() <0x0000000461b0aba8> (a io.netty.bootstrap.AbstractBootstrap$PendingRegistrationPromise)
at io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:131)
at io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:30)
at io.netty.util.concurrent.DefaultPromise.sync(DefaultPromise.java:405)
at io.netty.channel.DefaultChannelPromise.sync(DefaultChannelPromise.java:119)
at io.netty.channel.DefaultChannelPromise.sync(DefaultChannelPromise.java:30)
at com.example.Client.connect(Client.java:113)
at com.example.Client.reconnect(Client.java:141)
at com.example.handler.ClientHandler.channelUnregistered(ClientHandler.java:72)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:219)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:195)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:188)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelUnregistered(DefaultChannelPipeline.java:1388)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:215)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:195)
at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:821)
at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:821)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(java.base@11.0.22/Thread.java:829)
I tried refactoring the reconnect logic using a dedicated executor rather than processing in callback handler threads. is this correct approach? Do we need to use channel EventLoop to schedule the reconnect?
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
ctx.channel().close();
client.setConnected(false);
scheduledExecutorService.schedule(this::reconnectAttempt, 3, TimeUnit.SECONDS);
}
You should not call await()
or sync()
from threads in the event loop. This caused a dead lock to form in your case.
Instead of calling await, add a listener to the promise returned by the bootstrap hat has your error handling code.