androidkotlinwebsocketkotlin-coroutineskotlin-coroutine-channel

Android Kotlin Coroutine Channel message not sended in websocket callback


I am developing an chat android app using WebSocket (OkHttp)

To do this, I implemented the okhttp3.WebSocketListener interface. And I am receiving the chat messages from the onMessage callback method.

I already developed it using the Rx-PublishSubject, and it works fine. But I want to change it to Coroutine-Channel.

To do this, I added the Channel in my WebSocketListener class.

@Singleton
class MyWebSocketService @Inject constructor(
    private val ioDispatcher: CoroutineDispatcher
): WebSocketListener() {

  // previous
  val messageSubject: PublishSubject<WsMsg> = PublishSubject.create()

  // new
  val messageChannel: Channel<WsMsg> by lazy { Channel() }

  override fun onMessage(webSocket: WebSocket, text: String) {
    super.onMessage(webSocket, text)

    // previous
    messageSubject.onNext(text)

    // new
    runBlocking(ioDispatcher) {
      Log.d(TAG, "message: $text")
      messageChannel.send(text)
    }
  }
}

But... the coroutine channel doesn't work... It receives and prints the Log only once. But it doesn't print the log after the second message.

But when I change the code like below, it works!

  override fun onMessage(webSocket: WebSocket, text: String) {
    super.onMessage(webSocket, text)

    GlobalScope.launch(ioDispatcher) {
      Log.d(TAG, "message: $text")
      messageChannel.send(text)
    }
  }

The difference is runBlocking vs GlobalScope. I head that the GlobakScope may not ensure the message's ordering. So It is not suitable for the Chat app.

How can I solve this issue?


Solution

  • The default Channel() has no buffer, which causes send(message) to suspend until a consumer of the channel calls channel.receive() (which is implicitely done in a for(element in channel){} loop)

    Since you are using runBlocking, suspending effectively means blocking the current thread. It appears that okhttp will always deliver messages on the same thread, but it can not do that because you are still blocking that thread.

    The correct solution to this would be to add a buffer to your channel. If it is unlikely that messages will pour in faster than you can process them, you can simply replace Channel() with Channel(Channel.UNLIMITED)