androidkotlinkotlin-coroutineskotlin-flow

How to handle race condition with Coroutines in Kotlin?


I have a coroutine/flow problem that I'm trying to solve

I have this method getClosesRegion that's suppose to do the following:

  1. Attempt to connect to every region
  2. The first region to connect (I use launch to attempt to connect to all concurrently), should be returned and the rest of the region requests should be cancelled
  3. If all regions failed to connect OR after a 30 second timeout, throw an exception

That's currently what I have:

override suspend fun getClosestRegion(): Region {
        val regions = regionsRepository.getRegions()
        val firstSuccessResult = MutableSharedFlow<Region>(replay = 1)
        val scope = CoroutineScope(Dispatchers.IO)

        // Attempts to connect to every region until the first success
        scope.launch {
            regions.forEach { region ->
                launch {
                    val retrofitClient = buildRetrofitClient(region.backendUrl)
                    val regionAuthenticationAPI = retrofitClient.create(AuthenticationAPI::class.java)
                    val response = regionAuthenticationAPI.canConnect()
                    if (response.isSuccessful && scope.isActive) {
                        scope.cancel()
                        firstSuccessResult.emit(region)
                    }
                }
            }
        }

        val result = withTimeoutOrNull(TimeUnit.SECONDS.toMillis(30)) { firstSuccessResult.first() }
        if (result != null)
            return result
        throw Exception("Failed to connect to any region")
    }

Issues with current code:

  1. If 1 region was successfully connected, I expect that the of the requests will be cancelled (by scope.cancel()), but in reality other regions that have successfully connected AFTER the first one are also emitting value to the flow (scope.isActive returns true)
  2. I don't know how to handle the race condition of throw exception if all regions failed to connect or after 30 second timeout

Also I'm pretty new to kotlin Flow and Coroutines so I don't know if creating a flow is really necessary here


Solution

  • You don't need to create a CoroutineScope and manage it from within a coroutine. You can use the coroutineScope function instead.

    I of course didn't test any of the below, so please excuse syntax errors and omitted <types> that the compiler can't infer.

    Here's how you might do it using a select clause, but I think it's kind of awkward:

    override suspend fun getClosestRegion(): Region = coroutineScope {
        val regions = regionsRepository.getRegions()
        val result = select<Region?> {
            onTimeout(30.seconds) { null }
            for (region in regions) {
                launch {
                    val retrofitClient = buildRetrofitClient(region.backendUrl)
                    val regionAuthenticationAPI = retrofitClient.create(AuthenticationAPI::class.java)
                    val result = regionAuthenticationAPI.canConnect()
                    if (!it.isSuccessful) {
                        delay(30.seconds) // prevent this one from being selected
                    }
                }.onJoin { region }
            }
        }
        coroutineContext.cancelChildren() // Cancel any remaining async jobs
        requireNotNull(result) { "Failed to connect to any region" }
    }
    

    Here's how you could do it with channelFlow:

    override suspend fun getClosestRegion(): Region = coroutineScope {
        val regions = regionsRepository.getRegions()
        val flow = channelFlow {
            for (region in regions) {
                launch {
                    val retrofitClient = buildRetrofitClient(region.backendUrl)
                    val regionAuthenticationAPI = retrofitClient.create(AuthenticationAPI::class.java)
                    val result = regionAuthenticationAPI.canConnect()
                    if (result.isSuccessful) {
                        send(region)
                    }
                }
            }
        }
        val result = withTimeoutOrNull(30.seconds) { 
            flow.firstOrNull()
        }
        coroutineContext.cancelChildren() // Cancel any remaining async jobs
        requireNotNull(result) { "Failed to connect to any region" }
    }
    

    I think your MutableSharedFlow technique could also work if you dropped the isActive check and used coroutineScope { } and cancelChildren() like I did above. But it seems awkward to create a shared flow that isn't shared by anything (it's only used by the same coroutine that created it).