kotlincallbackkotlin-coroutinesdispatcher

Convert callback to co-routine with constraint on main thread for initial call


I would like to convert a callback to co-routine, and the SDK states that the API call needs to be made on the main thread. The solution works, but I am not confident that it is correct in principle.

It looks like this roughly:

   override suspend fun registerUser(email: String): ResultHandler<Profile, Exception> {

        return suspendCancellableCoroutine { continuation ->

            val observer =
                object : Observer<RegisterResponse<Profile?>> {
               fun onNext(t: Profile) {
                 continuation.resume(Success(t))
           }

     CoroutineScope(Dispatchers.Main).launch {
         userManager.register(email, observer)
       }
}
}

It seems to me that the SDK wants to invoke the observer callback on the Main thread, but my process is triggered in a view model scope on the IO thread (to avoid blocking main). So the observer I guess is in practice running on the IO thread.

Thoughts on how to approach this?


Solution

  • Just to get this out of the way, if this library provided you an ObservableSource reference instead of making you pass it an Observer, you could use awaitFirst() on it, which would of course be simpler than implementing this yourself.


    Avoid doing this: CoroutineScope(Dispatchers.Main).launch which is essentially no different than using GlobalScope.launch(Dispatchers.Main). It creates an unbound (never cancelled) scope, which is a common source of memory leaks. If the coroutine that is calling this suspend function is cancelled, the other coroutine you have launched will not be notified and cancelled, since it is not a child.

    Secondly, the other coroutine does not wait for it--the events of the inner coroutine can come some time in the future.

    To ensure you register your API on the main thread, use a withContext(Dispatchers.Main) call around this whole function. Then, the suspendCancellableCoroutine lambda block will run on the main thread, so you'll be calling the API registration function on the main thread.

    Some other points about implementing this:

    Since the Kotlin coroutines library is open source, you can see how they implemented Observable.await here for an example of how to do this kind of thing properly.

    The solution should look something like:

    override suspend fun registerUser(email: String): ResultHandler<Profile, Exception> = withContext(Dispatchers.Main) {
        suspendCancellableCoroutine { continuation ->
            val observer = object : Observer<RegisterResponse<Profile?>> {
                lateinit var subscription: Disposable
                var seenValue = false
    
                override fun onSubscribe(disposable: Disposable) {
                    subscription = disposable
                    continuation.invokeOnCancellation { subscription.dispose() }
                }
    
                override fun onNext(t: Profile) {
                     if (!seenValue) {
                         seenValue = true
                         continuation.resume(Success(t))
                         subscription.dispose()
                     }
                }
    
                override fun onComplete() {
                    if (continuation.isActive && !seenValue) {
                        continuation.resume(Error(NoSuchElementException("Observer completed without emitting any value.")))
                    }
                }
    
                override fun onError(throwable: Throwable) {
                    if (continuation.isActive) continuation.resume(Error(throwable))
                }
            }
            userManager.register(email, observer)
        }
    }