kotlinkotlin-coroutines

Kotlin Flow: How to unsubscribe/stop


Update Coroutines 1.3.0-RC

Working version:

@FlowPreview
suspend fun streamTest(): Flow<String> = channelFlow {
    listener.onSomeResult { result ->
        if (!isClosedForSend) {
            offer(result)
        }
    }

    awaitClose {
        listener.unsubscribe()
    }
}

Also checkout this Medium article by Roman Elizarov: Callbacks and Kotlin Flows

Original Question

I have a Flow emitting multiple Strings:

@FlowPreview
suspend fun streamTest(): Flow<String> = flowViaChannel { channel ->
    listener.onSomeResult { result ->
            if (!channel.isClosedForSend) {
                channel.sendBlocking(result)
            }
    }
}

After some time I want to unsubscribe from the stream. Currently I do the following:

viewModelScope.launch {
    beaconService.streamTest().collect {
        Timber.i("stream value $it")
        if(it == "someString")
            // Here the coroutine gets canceled, but streamTest is still executed
            this.cancel() 
    }
}

If the coroutine gets canceled, the stream is still executed. There is just no subscriber listening to new values. How can I unsubscribe and stop the stream function?


Solution

  • A solution is not to cancel the flow, but the Job returned from launching the flow in a scope.

    val job = scope.launch { flow.cancellable().collect { } }
    job.cancel()
    

    NOTE: cancellable() will ensure the flow is terminated before new items are emitted to collect { } if its job is cancelled, though flow builder and all implementations of SharedFlow are cancellable() by default.