javascalaperformancenettynio

Why is the following Java NIO API so slow when compared to netty


I have the following JAVA implementation to create a simple web server using the NIO API.

package zion

import java.net._
import java.nio.ByteBuffer
import java.nio.channels._

object NHello {

  import java.nio.CharBuffer
  import java.nio.charset.Charset

  def helloWorldBytes: ByteBuffer = Charset
    .forName("ISO-8859-1")
    .newEncoder
    .encode(CharBuffer.wrap(httpResponse("NHello World\n")))

  def httpResponse(content: String): String = {
    val rn = "\r\n"
    List(
      "HTTP/1.1 200 OK",
      "Content-Type: text/html",
      "Connection: Keep-Alive",
      s"Content-Length: ${content.length()}",
      rn + content
    ).mkString(rn)
  }

  def main(args: Array[String]): Unit = {
    val port    = 8080
    val address = new InetSocketAddress(port)

    // Server Socket Channel
    val serverSocketChannel = ServerSocketChannel.open()
    serverSocketChannel.bind(address)
    serverSocketChannel.configureBlocking(false)

    // Selector
    val selector = Selector.open()
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT)

    while (true) {
      selector.select()
      val iterator = selector.selectedKeys().iterator()
      while (iterator.hasNext) {
        val key = iterator.next()
        if (key.isAcceptable) {
          val channel = serverSocketChannel.accept()
          channel.write(helloWorldBytes)
          channel.close()
        }

      }
      iterator.remove()
    }

    sys.addShutdownHook({
      println("Shutting down...")
      serverSocketChannel.close()
    })

    println("Exiting...")
  }
}

Using wrk I get around a few thousand requests per second.

wrk -t12 -c100 -d10s http://127.0.0.1:8080

This seems like a bit too slow when compared to Netty. With Netty I am able to get at least 10 ~ 15 times better throughput. Considering Netty is also built on top of NIO what am I doing wrong?

Are there some obvious performance optimizations that I am missing?


Solution

  • After doing some further searching and analysis I have finally figured out all the problems in this above code.

    def main(args: Array[String]): Unit = {
        val port    = 8080
        val address = new InetSocketAddress(port)
    
        val serverSocketChannel = ServerSocketChannel.open()
        serverSocketChannel.bind(address)
        serverSocketChannel.configureBlocking(false)
    
        val selector = Selector.open()
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT)
    
        while (true) {
          selector.select()
          val iterator = selector.selectedKeys().iterator()
          while (iterator.hasNext) {
            val key = iterator.next()
            if (key.isAcceptable) {
              val channel = serverSocketChannel.accept()
              // 1. Blocking Write
              channel.write(helloWorldBytes)
              // 2. Blocking Close
              channel.close()
            }
    
          }
          iterator.remove()
        }
    
        sys.addShutdownHook({
          println("Shutting down...")
          serverSocketChannel.close()
        })
    
        println("Exiting...")
      }
    }
    

    The main problems were that

    1. Blocking Write Because of the blocking write call, unless the bytes are written into the stream I was not able to accept more connections. So those connections are just lying idle, thus affecting the performance of the webserver

    2. Blocking Close The close call is also blocking and takes time to complete. Again unless the connection is closed, no new requests are accepted and no accepted connections are responded.

    There is another problem in closing the connection: Creating a new connection is expensive and tools like wrk etc. don't kill the connection automatically after making one request. Closing it on the server is after each request also becomes a performance killer and thus affects your benchmarks.

    Here is an alternative "Highly performant" implementation

    package zion
    
    import java.io.IOException
    import java.net.InetSocketAddress
    import java.nio.ByteBuffer
    import java.nio.channels.{
      AsynchronousChannelGroup,
      AsynchronousServerSocketChannel,
      AsynchronousSocketChannel,
      CompletionHandler
    }
    import java.util.concurrent.{Executors, TimeUnit}
    
    /**
      * This is potentially as fast as it can get using NIO APIs.
      */
    object HelloAsyncNIO {
      // Create a thread pool for the socket channel
      // It would be better to have probably only one thread for events.
      // That pool could be shared betwee the SocketServer and in future SocketClients.
      private val group =
        AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(24))
    
      // Socket to accept connections
      private val serverSocketChannel = AsynchronousServerSocketChannel.open(group)
    
      // Port to be used to connect
      private val PORT = 8081
    
      // Flag to handle logging
      private val ENABLE_LOGGING = false
    
      /**
        * Contains utilities to manage read/write on the socket channels
        */
      object NIOBuffer {
        def helloWorldBytes: ByteBuffer = Charset
          .forName("ISO-8859-1")
          .newEncoder
          .encode(CharBuffer.wrap(httpResponse("NHello World\n")))
    
        def httpResponse(content: String): String = {
          val rn = "\r\n"
          List(
            "HTTP/1.1 200 OK",
            "Content-Type: text/html",
            "Connection: Keep-Alive",
            s"Content-Length: ${content.length()}",
            rn + content
          ).mkString(rn)
        }
        private val writeByteBuffer = ByteBuffer.wrap(helloWorldBytes)
        private val readByteBuffer  = ByteBuffer.allocateDirect(1024 * 2) // 2kb
        def read(
            socket: AsynchronousSocketChannel
        )(h: CompletionHandler[Integer, AsynchronousSocketChannel]): Unit =
          socket.read(readByteBuffer.duplicate(), socket, h)
        def write(
            socket: AsynchronousSocketChannel
        )(h: CompletionHandler[Integer, AsynchronousSocketChannel]): Unit =
          socket.write(writeByteBuffer.duplicate(), socket, h)
      }
    
      // Generic async completion handler
      case class Handle[V, A](cb: (V, A) => Unit) extends CompletionHandler[V, A] {
        override def completed(result: V, attachment: A): Unit =
          cb(result, attachment)
        override def failed(cause: Throwable, attachment: A): Unit = {
          cause match {
            case e: IOException => log(e.getMessage)
            case _              => cause.printStackTrace()
          }
        }
      }
    
      // Logging utility
      def log(input: Any*): Unit = {
        if (ENABLE_LOGGING) println(input.map(_.toString).mkString(", "))
      }
    
      private val onAccept
          : Handle[AsynchronousSocketChannel, AsynchronousServerSocketChannel] =
        Handle[AsynchronousSocketChannel, AsynchronousServerSocketChannel](
          (socket, server) => {
            log("\nACCEPT")
    
            // Accept new connections immediately
            server.accept(serverSocketChannel, onAccept)
    
            // Read from the current socket
            NIOBuffer.read(socket)(onRead)
          }
        )
    
      private val onRead: Handle[Integer, AsynchronousSocketChannel] =
        Handle[Integer, AsynchronousSocketChannel]((bytes, socket) => {
          log("READ", bytes)
    
          // EOF, meaning connection can be closed
          if (bytes == -1) socket.close()
    
          // Some data was read and now we can respond back
          else if (bytes > 0) NIOBuffer.write(socket)(onWrite)
    
        })
    
      private val onWrite: Handle[Integer, AsynchronousSocketChannel] =
        Handle[Integer, AsynchronousSocketChannel]((bytes, socket) => {
          log("WRITE", bytes)
    
          // Read from the socket
          NIOBuffer.read(socket)(onRead)
        })
    
      def main(args: Array[String]): Unit = {
    
        // Setup socket channel
        serverSocketChannel.bind(new InetSocketAddress(PORT))
        serverSocketChannel.accept(serverSocketChannel, onAccept)
    
        // Making the main thread wait
        group.awaitTermination(Long.MaxValue, TimeUnit.SECONDS)
      }
    }