I am developing an chat android app using WebSocket (OkHttp)
To do this, I implemented the okhttp3.WebSocketListener
interface.
And I am receiving the chat messages from the onMessage
callback method.
I already developed it using the Rx-PublishSubject, and it works fine. But I want to change it to Coroutine-Channel.
To do this, I added the Channel in my WebSocketListener
class.
@Singleton
class MyWebSocketService @Inject constructor(
private val ioDispatcher: CoroutineDispatcher
): WebSocketListener() {
// previous
val messageSubject: PublishSubject<WsMsg> = PublishSubject.create()
// new
val messageChannel: Channel<WsMsg> by lazy { Channel() }
override fun onMessage(webSocket: WebSocket, text: String) {
super.onMessage(webSocket, text)
// previous
messageSubject.onNext(text)
// new
runBlocking(ioDispatcher) {
Log.d(TAG, "message: $text")
messageChannel.send(text)
}
}
}
But... the coroutine channel doesn't work... It receives and prints the Log only once. But it doesn't print the log after the second message.
But when I change the code like below, it works!
override fun onMessage(webSocket: WebSocket, text: String) {
super.onMessage(webSocket, text)
GlobalScope.launch(ioDispatcher) {
Log.d(TAG, "message: $text")
messageChannel.send(text)
}
}
The difference is runBlocking
vs GlobalScope
.
I head that the GlobakScope
may not ensure the message's ordering.
So It is not suitable for the Chat app.
How can I solve this issue?
The default Channel()
has no buffer, which causes send(message)
to suspend until a consumer of the channel calls channel.receive()
(which is implicitely done in a for(element in channel){}
loop)
Since you are using runBlocking
, suspending effectively means blocking the current thread. It appears that okhttp will always deliver messages on the same thread, but it can not do that because you are still blocking that thread.
The correct solution to this would be to add a buffer to your channel. If it is unlikely that messages will pour in faster than you can process them, you can simply replace Channel()
with Channel(Channel.UNLIMITED)