kotlinkotlin-flowturbine

How can I test a Kotlin flow builder with delay() calls?


I have a piece of code in an Android Kotlin project similar to below, where I use the flow builder method to generate an infinite loop of periodic emissions:

fun doSomething(): Flow<Int> = flow {
 var i = 0
  while (true) {
    emit(i++)
    delay(5000L)
  }
}

I am then trying to unit test this flow using the (very useful!) Turbine library, as below:

@Test
fun myTest() = runTest {
  doSomething().test {
    assertEquals(expected = 0, actual = awaitItem())
    assertEquals(expected = 1, actual = awaitItem())
    assertEquals(expected = 2, actual = awaitItem())
    cancelAndIgnoreRemainingEvents()
  }
}

My understanding of the runTest method from the coroutines-test library is that it automatically skips any calls to delay within that scope, but the above test is failing with a timeout exception. I've tried littering log statements throughout, and only the first assertEquals call is being triggered. The latter two are never reached because the call to delay from within the flow is apparently blocking the test.

Is this expected behaviour when working with a Flow builder? If so, is there any way of controlling the passage of time in my scenario?


Solution

  • It looks like this is a bug in Turbine 0.8.0 and 0.9.0-SNAPSHOT. It doesn't inherit the test configuration, which can skip the delays. See the issue here: https://github.com/cashapp/turbine/issues/84

    If you want to test now, then you can do so manually.

    import kotlin.test.*
    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    import kotlinx.coroutines.test.runTest
    
    @OptIn(ExperimentalCoroutinesApi::class)
    class DelayTest {
    
        fun doSomething(): Flow<Int> = flow {
            var i = 0
            while (true) {
                println("${System.currentTimeMillis()} emitting $i")
                emit(i++)
                delay(5000L)
            }
        }
    
        @Test
        fun testFlow() = runTest {
            val result = doSomething().take(3).toList()
    
            assertEquals(listOf(0, 1, 2), result)
        }
    }
    

    Or, as a workaround, you can make the flow use the test dispatcher using flowOn(). This changes the context upstream and does not "leak downstream".

        @Test
        fun testFlowOnWorkaround() = runTest {
            val testFlow = doSomething()
                .flowOn(UnconfinedTestDispatcher(testScheduler))
    
            testFlow.test {
                assertEquals(expected = 0, actual = awaitItem())
                assertEquals(expected = 1, actual = awaitItem())
                assertEquals(expected = 2, actual = awaitItem())
                cancelAndIgnoreRemainingEvents()
            }
        }
    

    As another workaround you can first collect the flow normally to a list. Thanks to runTest{} the delays are skipped. Then you can transform it back into a flow, and test that flow.

        @Test
        fun testFlowToListWorkaround() = runTest {
            val myFlow = doSomething().take(3).toList().asFlow()
    
            myFlow.test {
                assertEquals(expected = 0, actual = awaitItem())
                assertEquals(expected = 1, actual = awaitItem())
                assertEquals(expected = 2, actual = awaitItem())
                cancelAndIgnoreRemainingEvents()
            }
        }
    

    Versions: