androidkotlinkotlin-coroutinesandroid-viewmodelkotlin-flow

How do you cancel collection and/or onEach for a Kotlin coroutine Flow?


I have the following common code for "subscribing" to a Kotlin coroutine Flow that is used throughout my Android application:

fun <T> subscribe(
    myFlow: Flow<Conclusion<T>>,
    success: suspend (T) -> Unit,
    error: suspend (Throwable?) -> Unit,
) = collect(myFlow) { conclusion ->
    conclusion
        .onSuccess { success -> success(success) }
        .onError { error -> error(error) }
}

protected fun <T> collect(flow: Flow<T>, onEach: suspend (T) -> Unit) {
    viewModelScope.launch {
        withContext(Dispatchers.IO) {
            flow.filterNotNull()
                .onEach { onEach(it) }
                .catch { onError(it) }
                .launchIn(this)
        }
    }
}

Currently this common code does not support cancellation.

My initial attempt to fix this is as follows:

fun <T> subscribe(
    myFlow: Flow<Conclusion<T>>,
    success: suspend (T) -> Unit,
    error: suspend (Throwable?) -> Unit,
) = collect(myFlow.cancellable()) /* CHANGE HERE 1 */ { conclusion ->
    conclusion
        .onSuccess { success -> success(success) }
        .onError { error -> error(error) }
}

protected fun <T> collect(flow: Flow<T>, onEach: suspend (T) -> Unit) {
    viewModelScope.launch {
        withContext(Dispatchers.IO) {
            var theJobToCancel: Job? = currentCoroutineContext().job /* CHANGE HERE 2 */
            flow.filterNotNull()
                .onEach {
                    currentCoroutineContext().ensureActive() /* CHANGE HERE 3 */
                    onEach(it)
                }
                .catch { onError(it) }
                .launchIn(this)
        }
    }
}

I now need to be able to communicate with the collect function to trigger cancellation of the active collection process via onEach.

theJobToCancel is the job I need to cancel, however I cannot see how to achieve this.

How can I cancel onEach via theJobToCancel for out side this common code block?


Solution

  • This is not how Flows should be used in view models.

    The view model's job is to prepare all data so it is ready for dispaly. That is done by transforming all underlying flows and convert the result into a StateFlow with stateIn that is then exposed as a property. Your UI then just collects this resulting flow. When this is done in a lifecycle-aware manner the StateFlow will be automatically subscribed and unsubscribed as needed. The underlying flows will be started and stopped accordingly. If you use Compose for your UI it is as simple as just calling collectAsStateWithLifecycle().

    Apart from StateFlows as properties the only other thing your view model should expose are simple functions that can be called by the UI to trigger some business logic. They have simple parameters (no function types, i.e. no callbacks) and have no return value (i.e. Unit). The effect of calling such a function will be reflected in a changed value of your StateFlows.

    With this in mind, there is no reason to have a subscribe function in the view model. Neither is it to provide a success or error callback. Also, you shouldn't call launchIn as you shouldn't collect flows in the view model, they should only be transformed.

    This is probably a larger refactoring of your code. But in the end you would have a much cleaner separation of concerns and less code (because subscription and unsubscription is done automatically).