I have a long running job (network call) inside a map
which "delays" the following emissions until the previous map has finished (mapLatest
is not desired because it cancels previous executions). Is there a way to execute in parallel the code inside a map as soon as a value is emitted.
Probably a snippet of code will explain the idea better
suspend fun main() = coroutineScope {
val flow = flow {
repeat(4) {
emit(it) //fast emission
}
}.map {
println("Loading")
delay(1000) //this could be a network call
println("I'm slow")
it
}
flow.collect {
println("HEY $it")
}
}
Unfortunately, Kotlin flows don't provide support for concurrent processing at the time of writing this answer (Kotlin 1.8.20). There are some plans and discussion about it since 2019, but it was not yet implemented.
Of course, with full power of coroutines, we can implement it by ourselves. There are multiple approaches: we can use flatMapMerge() which already provides a concurrent map operation; we can create a flow of deferred values and proces them in async-await style; but for me the best solution is by using a channelFlow(), and by manually launching coroutines and processing the data:
inline fun <T, R> Flow<T>.concurrentMap(crossinline transform: suspend (T) -> R): Flow<R> = channelFlow {
collect {
launch { send(transform(it)) }
}
}
Implementation is really simple: for each item collected from the upstream flow, we launch a new coroutine, we transform the item and send the result to the downstream flow. Nice thing about this solution is that it utilizes the dispatcher of the flow collector, so it behaves similarly to other flow operators.
Please note this solution doesn't keep original ordering of items and it doesn't provide a backpressure. Even if we don't consume items from the resulting flow, it will still keep emitting and transforming items.
You can read more about various implementations (ordered, with backpressure, etc.) and future plans here: https://github.com/Kotlin/kotlinx.coroutines/issues/1147