I would like to convert a callback to co-routine, and the SDK states that the API call needs to be made on the main thread. The solution works, but I am not confident that it is correct in principle.
It looks like this roughly:
override suspend fun registerUser(email: String): ResultHandler<Profile, Exception> {
return suspendCancellableCoroutine { continuation ->
val observer =
object : Observer<RegisterResponse<Profile?>> {
fun onNext(t: Profile) {
continuation.resume(Success(t))
}
CoroutineScope(Dispatchers.Main).launch {
userManager.register(email, observer)
}
}
}
It seems to me that the SDK wants to invoke the observer callback on the Main thread, but my process is triggered in a view model scope on the IO thread (to avoid blocking main). So the observer I guess is in practice running on the IO thread.
Thoughts on how to approach this?
Just to get this out of the way, if this library provided you an ObservableSource reference instead of making you pass it an Observer, you could use awaitFirst()
on it, which would of course be simpler than implementing this yourself.
Avoid doing this: CoroutineScope(Dispatchers.Main).launch
which is essentially no different than using GlobalScope.launch(Dispatchers.Main)
. It creates an unbound (never cancelled) scope, which is a common source of memory leaks. If the coroutine that is calling this suspend function is cancelled, the other coroutine you have launched will not be notified and cancelled, since it is not a child.
Secondly, the other coroutine does not wait for it--the events of the inner coroutine can come some time in the future.
To ensure you register your API on the main thread, use a withContext(Dispatchers.Main)
call around this whole function. Then, the suspendCancellableCoroutine
lambda block will run on the main thread, so you'll be calling the API registration function on the main thread.
Some other points about implementing this:
onSubscribe
function that hands you a Disposable that you can use to cancel early. You need to do this to support cancellation.continuation.resume()
more than once will crash your coroutine, so you need some safeguards just in case the API surprises you and emits more than one item.onError
, I also check continuation.isActive
to avoid a multiple-resume crash in the possible case of a single item being emitted followed by an error occurring before the subscription ends.Since the Kotlin coroutines library is open source, you can see how they implemented Observable.await
here for an example of how to do this kind of thing properly.
The solution should look something like:
override suspend fun registerUser(email: String): ResultHandler<Profile, Exception> = withContext(Dispatchers.Main) {
suspendCancellableCoroutine { continuation ->
val observer = object : Observer<RegisterResponse<Profile?>> {
lateinit var subscription: Disposable
var seenValue = false
override fun onSubscribe(disposable: Disposable) {
subscription = disposable
continuation.invokeOnCancellation { subscription.dispose() }
}
override fun onNext(t: Profile) {
if (!seenValue) {
seenValue = true
continuation.resume(Success(t))
subscription.dispose()
}
}
override fun onComplete() {
if (continuation.isActive && !seenValue) {
continuation.resume(Error(NoSuchElementException("Observer completed without emitting any value.")))
}
}
override fun onError(throwable: Throwable) {
if (continuation.isActive) continuation.resume(Error(throwable))
}
}
userManager.register(email, observer)
}
}