kotlinkotlin-coroutineskotlin-flowkotlin-stateflowkotlin-sharedflow

Flow.stateIn() not receiving new value from it's source


I tried to combine several SharedFlow into one StateFlow using stateIn. But my StateFlow seems not updating after I emit new value to it's SharedFlow source. I found out that the problem is from how I used stateIn.

Here is the simplified code I am using (you can run it from kotlin playground).

import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.launch

fun main() = runBlocking {
    val sourceFlow = MutableSharedFlow<Int>()
    val stateFlow = sourceFlow.stateIn(GlobalScope, SharingStarted.Lazily, 0)

    val job = launch { stateFlow.collect() }

    sourceFlow.emit(99)
    println(stateFlow.value)

    job.cancel()
}

println(stateFlow.value) prints out 0 when it should prints 99. I already follow through this documentation about stateIn but still can not find the issue. Someone knows where I did wrong?


Solution

  • The short answer is that no collection has started when 99 is emitted, and thus the value is simply discarded. This is the expected behaviour of unbuffered shared flow described in the docs of SharedFlow:

    A default implementation of a shared flow that is created with MutableSharedFlow() constructor function without parameters has no replay cache nor additional buffer. emit call to such a shared flow suspends until all subscribers receive the emitted value and returns immediately if there are no subscribers.

    The reason there are no subscribers on sourceFlow (the shared flow) is because you called stateIn() on this flow with SharingStarted.Lazily:

    Sharing is started when the first subscriber appears and never stops

    That means the state flow's coroutine only starts collecting the source when the first subscriber starts collecting the state flow. And the reason there is no subscriber on the state flow is because your launch hasn't had time to call collect before you reach the emit.

    When you call launch, you're starting an asynchronous operation. The code inside the launch runs concurrently with the code that follows it. Since you're running this launch inside runBlocking's scope, it means it runs on the same thread as the rest of main's code, which means that the body of the launch cannot start running until the main function's body actually suspends. And here it only suspends when reaching emit().