I have a coroutine/flow problem that I'm trying to solve
I have this method getClosesRegion
that's suppose to do the following:
region
launch
to attempt to connect to all concurrently), should be returned and the rest of the region requests should be cancelledThat's currently what I have:
override suspend fun getClosestRegion(): Region {
val regions = regionsRepository.getRegions()
val firstSuccessResult = MutableSharedFlow<Region>(replay = 1)
val scope = CoroutineScope(Dispatchers.IO)
// Attempts to connect to every region until the first success
scope.launch {
regions.forEach { region ->
launch {
val retrofitClient = buildRetrofitClient(region.backendUrl)
val regionAuthenticationAPI = retrofitClient.create(AuthenticationAPI::class.java)
val response = regionAuthenticationAPI.canConnect()
if (response.isSuccessful && scope.isActive) {
scope.cancel()
firstSuccessResult.emit(region)
}
}
}
}
val result = withTimeoutOrNull(TimeUnit.SECONDS.toMillis(30)) { firstSuccessResult.first() }
if (result != null)
return result
throw Exception("Failed to connect to any region")
}
Issues with current code:
scope.cancel()
), but in reality other regions that have successfully connected AFTER the first one are also emitting value to the flow (scope.isActive
returns true)throw exception if all regions failed to connect or after 30 second timeout
Also I'm pretty new to kotlin Flow and Coroutines so I don't know if creating a flow is really necessary here
You don't need to create a CoroutineScope and manage it from within a coroutine. You can use the coroutineScope
function instead.
I of course didn't test any of the below, so please excuse syntax errors and omitted <types>
that the compiler can't infer.
Here's how you might do it using a select
clause, but I think it's kind of awkward:
override suspend fun getClosestRegion(): Region = coroutineScope {
val regions = regionsRepository.getRegions()
val result = select<Region?> {
onTimeout(30.seconds) { null }
for (region in regions) {
launch {
val retrofitClient = buildRetrofitClient(region.backendUrl)
val regionAuthenticationAPI = retrofitClient.create(AuthenticationAPI::class.java)
val result = regionAuthenticationAPI.canConnect()
if (!it.isSuccessful) {
delay(30.seconds) // prevent this one from being selected
}
}.onJoin { region }
}
}
coroutineContext.cancelChildren() // Cancel any remaining async jobs
requireNotNull(result) { "Failed to connect to any region" }
}
Here's how you could do it with channelFlow
:
override suspend fun getClosestRegion(): Region = coroutineScope {
val regions = regionsRepository.getRegions()
val flow = channelFlow {
for (region in regions) {
launch {
val retrofitClient = buildRetrofitClient(region.backendUrl)
val regionAuthenticationAPI = retrofitClient.create(AuthenticationAPI::class.java)
val result = regionAuthenticationAPI.canConnect()
if (result.isSuccessful) {
send(region)
}
}
}
}
val result = withTimeoutOrNull(30.seconds) {
flow.firstOrNull()
}
coroutineContext.cancelChildren() // Cancel any remaining async jobs
requireNotNull(result) { "Failed to connect to any region" }
}
I think your MutableSharedFlow technique could also work if you dropped the isActive
check and used coroutineScope { }
and cancelChildren()
like I did above. But it seems awkward to create a shared flow that isn't shared by anything (it's only used by the same coroutine that created it).