javassljava-8nettynetty-socketio

SSL Handshake is not happening from the pipeline while doing reconnection to servers


Problem I'm facing is the something like this :

  1. in Main class, I'll try to connect to a server and attach Channel Listener for future actions.
  2. If Connection establishes successfully, SSL Handshake is done without any problem.
  3. But if Connection in Step 1 fails, I'll try to connect to same or different server and again attach same channel listener same as point.
  4. But expectation is it should SSL handshake as before in point 2 if connection is established. But it's not. Even if I forcefully call renegotiate metthod in SslHandler.

Expected behavior

If any connection exception using bootstrap object to connect to the server, expectation is it should SSL handshake.

Actual behavior It's skipping the SSL handshake while retrying and failing with UnknownMessage type expected(ByteBuf)

Steps to reproduce

  1. While Main Connection
    public class Main {    
    private final static Logger LOGGER = LoggerFactory.getLogger(Main.class);    
    public static void main(String[] args) {
    
        ClientConfig clientConfig = null;
    
        LOGGER.info("initializing Agent Stats uploader");
        // Set up.
        InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE);
        Bootstrap clientBootstrap = getBootstrap();
    
        clientConfig = ClientConfig.getInstance();
    
    
        InetSocketAddress server = clientConfig.getPrimaryScnInetAddrs();
        Objects.nonNull(server.getHostName());
        Objects.nonNull(server.getPort());
    
        // Make a new connection.
        LOGGER.info("Initialization complete, ready to connect to the host and port  {}:{}", server.getHostName(),
                server.getPort());
    
        ServerChannelFutureListener serverChannelFutureListener = ServerChannelFutureListener.getInstance();
        serverChannelFutureListener.setClientBootStrap(clientBootstrap);
    
        ChannelPromise channelPromise =
                (ChannelPromise) clientBootstrap.connect(server).addListener(serverChannelFutureListener);
    
        EventLoopGroup eventGroupExecutor = clientBootstrap.config().group();
        AgentStatsProcess agentStatsThread = AgentStatsProcess.getInstance();
        agentStatsThread.setParentChannelFuture(channelPromise);
        eventGroupExecutor.scheduleAtFixedRate(agentStatsThread, clientConfig.getInitialDelay(),
                clientConfig.getScheduleInterval(), TimeUnit.SECONDS);
        LOGGER.info("Scheduled Agent Stats uploading, should start in 30 secs");
    
        LOGGER.info("Connection complete");
    
    
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                LOGGER.info("Killing AgentStatUploader Thread");
                eventGroupExecutor.shutdownGracefully();
        }));
    }
    
    public static final Bootstrap getBootstrap() {
    EventLoopGroup group = new NioEventLoopGroup();
    
        Bootstrap b = new Bootstrap();
     b.group(group);
     b.channel(NioSocketChannel.class);
     b.handler(new AgentStatsChannelInitializationHandler());
    
     b.option(ChannelOption.SO_KEEPALIVE, true);
     b.option(ChannelOption.TCP_NODELAY, true);
    
     return b;
    }
    
    }
  1. Having Channel Future handler for implementing re-try logic in step 1
    public final class ServerChannelFutureListener implements GenericFutureListener {
    private static final Logger logger = LoggerFactory.getLogger(ServerChannelFutureListener.class.getName());
    private static ServerChannelFutureListener instance;
    
    private AtomicInteger count = new AtomicInteger(1);
    private ClientConfig clientConfig = ClientConfig.getInstance();
    private boolean isPrimary=true;
    private ChannelFuture channelFuture;
    private Bootstrap clientBootStrap;
    private long timeout;
    
    private ServerChannelFutureListener(){
    this.timeout = clientConfig.getRetryAfter();
    
    }
    @override
    public void operationComplete(ChannelFuture future) throws Exception {
    channelFuture = future;
    int maxretries = clientConfig.getMaxRetries();
    if (!future.isSuccess()) {
    logger.info("Connection to {} scn is not successful, retrying ({}/{})", getServerType(), count.get(),maxretries);
    logger.debug("Connection to server is failed with error: ",future.cause());
    if ( count.incrementAndGet() > maxretries) {
    // fails to connect even after max-retries, try to connect to next server.
    logger.info("Failed to connect to {} server, will try to connect to {} now.",
    getServerType(),
    isPrimary() ? "SECONDARY":"PRIMARY");
    
            count.getAndSet(1);
            isPrimary = !isPrimary();
            this.timeout = clientConfig.getRetryAfter();
            logger.info("Connecting Server type changed, so resetting timeout: {}", this.timeout);
    
        }else{
            // retry
            logger.info("Exponential Back-off set to: {} secs, waiting for next server connection", this.timeout);
            //TimeUnit.SECONDS.sleep(this.timeout);
            this.timeout = ExpontentialBackOff.getNextBackOff(this.timeout);
        }
    
        InetSocketAddress server = getServer();
        logger.info("Initialization complete, ready to connect to the host and port  {}:{}", server.getHostName(),
                server.getPort());
        channelFuture = clientBootStrap.connect(server).addListener(this);
     }else {
        logger.info("Using Connection with config: {}, to Server {} ", future.channel().config(),
                future.channel().localAddress());
        this.timeout = clientConfig.getRetryAfter();
        logger.info("Time out Back-off reset to: {} for next server connection", this.timeout);
    
    
     }
     AgentStatsProcess.getInstance().setParentChannelFuture(channelFuture);
    }
    
    private String getServerType() {
    return isPrimary() ? "PRIMARY" : "SECONDARY";
    }
    
    private InetSocketAddress getServer(){
    return isPrimary()?clientConfig.getPrimaryScnInetAddrs():clientConfig.getSecondaryScnInetAddrs();
    }
    
    public static ServerChannelFutureListener getInstance(){
    if(null == instance){
    instance = new ServerChannelFutureListener();
    }
    return instance;
    }
    
    public boolean isPrimary() {
    return isPrimary;
    }
    
    public ChannelFuture getChannelFuture() {
    return channelFuture;
    }
    
    public void setClientBootStrap(Bootstrap cb) {
    this.clientBootStrap = cb;
    }
    }

Expectation is SSL Handshake should happen after trying to reconnect but its failing.

Netty version: 4.1.12.Final


Solution

  • Fixed this issue, Culprit here is "ProtobufVarint32FrameDecoder " and it's parent Class "ByteToMessageDecoder". "ByteToMessageDecoder" make sure it's child classes are not shareable.

    Because above classes are not shareable, every time code try to reconnect using boostrap, initializer class fails to add handlers in pipeline results in "ctx.close()" and no handlers.

    I've did work-around of adding those two classes into my project and raised #10371 bug to address this issue.