multithreadingkotlinconcurrencykotlin-coroutineskotlin-flow

Flow producers/builders from multiple coroutines/threads


I have few producers, that emits data each to own flow, I then merge them to single one, which further process the values and consumes them:

fun makeInputFlow() = flow {
    while (shouldMakeRequest()) {
        // make (possibly blocking) network request
        // do some CPU-intensive operations
        results.forEach { emit(it) }
        yield()
    }
}

suspend fun runFlows() {
    val flow1 = makeInputFlow()
    val flow2 = makeInputFlow()
    val flow3 = makeInputFlow()

    val merged = listOf(flow1, flow2, flow3)
        .merge()
    merged.collect { println(it) }
}

This works, but it seems, that only one of the flow actually produces values at the time. CPU is unoccupied most of the time.

Is there a way to safely run each of the flow in its own coroutine/thread, so that all the flows produce values in parallel?


Solution

  • makeInputFlow() creates a cold flow. That is a flow that doesn't work on its own, it needs the caller's coroutine to run.1

    Since you only have a single coroutine collecting the three flows they cannot be operated in parallel. Except... You call merge on the three flows. merge internally collects the flows and uses a separate coroutine for each flow. From the documentation:

    All flows are merged concurrently, without limit on the number of simultaneously collected flows.

    So collecting the merged flow from a single coroutine should still work in parallel, since internally each of the three flows is executed in a separate coroutine.

    The reason why it still does not is that your code currently runs on the Main dispatcher. That is a dispatcher that only has a single thread. Running the three flows in separate coroutines doesn't help executing them in parallel because they cannot be distributed to separate threads, because ther are none.

    That can be easily remedied, though: Just switch runFlows to another dispatcher that has multiple threads:

    suspend fun runFlows() = withContext(Dispatchers.IO) {
        // ...
    }
    

    The IO dispatcher has a lot of threads but they are intended to mostly suspend, as it is the case for IO like network or filesystem access. When you do cpu-heavy operations you should choose Dispatchers.Default instead. That has as many threads as the CPU has cores so it can efficiently max out the hardware.


    1 In contrast to a hot SharedFlow which will run in its own coroutine, independent of how (or even if) it is collected.