androidkotlinkotlin-coroutineskotlin-stateflowkotlin-sharedflow

How this change from runBlocking to sharedFlow work?


I would like to ask you why does it work? Normally when I used collectLatest with flow my data wasn't collected on time and the return value was empty. I have to use async-await coroutines, but I have read it blocks main thread, so it is not efficient. I've made my research and find the solution using sharedflow.

Previously:

suspend fun getList: List<Items> {
    CoroutineScope(Dispatchers.Main).launch {
        async {
            flow.collectLatest {
                myItems = it
            }
        }.await()
    }
    return myItems
}

or without await-async and it returns emptyList

now:

suspend fun getList: List<Items> {
    val sharedFlow = flow.conflate().shareIn(
         coroutineScopeIO,
         replay = 1,
         started = SharingStarted.WhileSubscribed()
    )
    return sharedFlow.first()
}

conflate means:

Conflates flow emissions via conflated channel and runs collector in a separate coroutine. The effect of this is that emitter is never suspended due to a slow collector, but collector always gets the most recent value emitted.

I'm not sure I understand it clearly. When I conflate flow, I just create seperate coroutine to emit what will be inside my another function as in my example shareIn().first() and using this variablesharedFlow which is suspended so will give the same effect I made asnyc-await, but in that case I do not block main thread, but only my exact *parentCoroutine-or-suspendFunction?

SharingStarted.WhileSubscribed() It just means to start emit when subcribed.


Solution

  • conflate() has nothing to do with why this is working. The separate coroutine it talks about is run under the hood and you don't need to think about it. It's just to make sure your flow never causes the upstream emitter to have to wait for a slow collector, and your collector skips values if they are coming faster than it can handle them. conflate() makes it safe to have a slow collector without a buffer.

    In your first code block, you are launching a new coroutine in a new CoroutineScope, so it is not a child coroutine and will not be waited for before the function returns. (Incidentally, this new coroutine will only finish when the Flow completes, and most types of Flows never complete.)

    In the second code block, you are calling first() on the Flow, which suspends and gets the next value emitted by the flow and then returns that value without waiting for the Flow to complete.


    Some other notes:

    suspend fun getList: List<Items> {
        return flow.first()
    }