nettyreconnectswift-nio

Swift-NIO TCP Client auto reconnect


I write a TCP Client in Swift-NIO to connect Netty TCP Server. I want tcp client can auto reconnect when needed.

import Foundation
import NIO

class MessageHandler: ChannelInboundHandler {
    let notificationMessage = NSNotification.Name(rawValue: "Location")
    public typealias InboundIn = ByteBuffer
    public typealias OutboundOut = ByteBuffer
    private var numBytes = 0
    private var task: RepeatedTask? = nil
    private var bootstrap: ClientBootstrap

    init(bootstrap: ClientBootstrap) {
        self.bootstrap = bootstrap
    }

    public func channelActive(context: ChannelHandlerContext) {
        print("Reconnect Successful")

        self.task?.cancel()
        context.fireChannelActive()
    }

    func channelInactive(context: ChannelHandlerContext) {
        self.task = context.channel.eventLoop.scheduleRepeatedTask(initialDelay: TimeAmount.seconds(0), delay: TimeAmount.seconds(10), { (RepeatedTask) in
            print("Reconnecting...")

            try { () -> EventLoopFuture<Channel> in
                return try self.bootstrap.connect(host: SystemUtil.getConfig(key: "IP") as! String, port: SystemUtil.getConfig(key: "TCPPort") as! Int)
                }()
        })

        context.fireChannelInactive()
    }

    public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        var buffer = unwrapInboundIn(data)
        let readableBytes = buffer.readableBytes
        if let message = buffer.readString(length: readableBytes) {
            print(message)
            let dictMessage = ["Location": message]
            NotificationCenter.default.post(name: notificationMessage , object:MessageHandler.self, userInfo: dictMessage)
        }
    }

    public func errorCaught(context: ChannelHandlerContext, error: Error) {
        print("error: ", error)

        // As we are not really interested getting notified on success or failure we just pass nil as promise to
        // reduce allocations.
        context.close(promise: nil)
    }
}

It about works but something not right. I use eventLoop.scheduleRepeatedTask to check every 10s, when connected then cancel the RepeatedTask.But self.task?.cancel() not work, I looked source code for cancel. What is the right method to cancel a RepeatedTask? Thanks

private func cancel0(localCancellationPromise: EventLoopPromise<Void>?) {
        self.eventLoop.assertInEventLoop()
        self.scheduled?.cancel()
        self.scheduled = nil
        self.task = nil

        // Possible states at this time are:
        //  1) Task is scheduled but has not yet executed.
        //  2) Task is currently executing and invoked `cancel()` on itself.
        //  3) Task is currently executing and `cancel0()` has been reentrantly invoked.
        //  4) NOT VALID: Task is currently executing and has NOT invoked `cancel()` (`EventLoop` guarantees serial execution)
        //  5) NOT VALID: Task has completed execution in a success state (`reschedule()` ensures state #2).
        //  6) Task has completed execution in a failure state.
        //  7) Task has been fully cancelled at a previous time.
        //
        // It is desirable that the task has fully completed any execution before any cancellation promise is
        // fulfilled. States 2 and 3 occur during execution, so the requirement is implemented by deferring
        // fulfillment to the next `EventLoop` cycle. The delay is harmless to other states and distinguishing
        // them from 2 and 3 is not practical (or necessarily possible), so is used unconditionally. Check the
        // promises for nil so as not to otherwise invoke `execute()` unnecessarily.
        if self.cancellationPromise != nil || localCancellationPromise != nil {
            self.eventLoop.execute {
                self.cancellationPromise?.succeed(())
                localCancellationPromise?.succeed(())
            }
        }
    }

Yes the task is nil so cancel don't work. I change global variable to static

static var task: RepeatedTask? = nil

Now works fine.

But I still not sure what is the best practice with auto reconnect in Swift-NIO. In my Android App I used Netty for TCP Client like this

private inner class ConnectServerThread : Thread() {
    override fun run() {
        super.run()

        val workerGroup = NioEventLoopGroup()

        try {
            val bootstrap = Bootstrap()
            bootstrap.group(workerGroup)
                .channel(NioSocketChannel::class.java)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_REUSEADDR, true)
                .handler(object : ChannelInitializer<SocketChannel>() {
                    public override fun initChannel(ch: SocketChannel) {
                        ch.pipeline().addLast(
                            ReconnectHandler(bootstrap, channel),
                            StringEncoder(StandardCharsets.UTF_8),
                            StringDecoder(StandardCharsets.UTF_8),
                            MessageHandlerAdapter()
                        )
                    }
                })
            val channelFuture = bootstrap.connect(
                InetSocketAddress(
                    ConfigUtil.config!!.ip,
                    ConfigUtil.config!!.tcpPort!!.toInt()
                )
            ).sync()
            channelFuture.addListener {
                getConnectionListener()
            }
            channel = channelFuture.channel() as SocketChannel
        } catch (e: Exception) {
            Log.d("SystemService", e.toString())
        }
    }
}

I used ReconnectHandler for reconnect and getConnectionListener for listen. In Swift-NIO is there similar Listener or other solution?


Solution

  • The solution for SwiftNIO will require your handler to attach callbacks to the future returned from connect. These callbacks can close over the repeating task, and so can cancel it when a connection completes. For example:

    import Foundation
    import NIO
    
    final class Reconnector {
        private var task: RepeatedTask? = nil
        private let bootstrap: ClientBootstrap
    
        init(bootstrap: ClientBootstrap) {
            self.bootstrap = bootstrap
        }
    
        func reconnect(on loop: EventLoop) {
            self.task = loop.scheduleRepeatedTask(initialDelay: .seconds(0), delay: .seconds(10)) { task in
                print("reconnecting")
                try self._tryReconnect()
            }
        }
    
        private func _tryReconnect() throws {
            try self.bootstrap.connect(host: SystemUtil.getConfig(key: "IP") as! String, port: SystemUtil.getConfig(key: "TCPPort") as! Int).whenSuccess { _ in
                print("reconnect successful!")
                self.task?.cancel()
                self.task = nil
            }
        }
    }
    
    class MessageHandler: ChannelInboundHandler {
        let notificationMessage = NSNotification.Name(rawValue: "Location")
        public typealias InboundIn = ByteBuffer
        public typealias OutboundOut = ByteBuffer
        private var numBytes = 0
        private let reconnect: Reconnector
    
        init(bootstrap: ClientBootstrap) {
            self.reconnector = Reconnector(bootstrap: bootstrap)
        }
    
        func channelInactive(context: ChannelHandlerContext) {
            self.reconnector.reconnect()
            context.fireChannelInactive()
        }
    
        public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
            var buffer = unwrapInboundIn(data)
            let readableBytes = buffer.readableBytes
            if let message = buffer.readString(length: readableBytes) {
                print(message)
                let dictMessage = ["Location": message]
                NotificationCenter.default.post(name: notificationMessage , object:MessageHandler.self, userInfo: dictMessage)
            }
        }
    
        public func errorCaught(context: ChannelHandlerContext, error: Error) {
            print("error: ", error)
    
            // As we are not really interested getting notified on success or failure we just pass nil as promise to
            // reduce allocations.
            context.close(promise: nil)
        }
    }