kotlinkotlin-coroutinesjobs

Kotlin Coroutines - How to block to await/join all jobs?


I am new to Kotlin/Coroutines, so hopefully I am just missing something/don't fully understand how to structure my code for the problem I am trying to solve.

Essentially, I am taking a list of strings, and for each item in the list I want to send it to another method to do work (make a network call and return data based on the response). (Edit:) I want all calls to launch concurrently, and block until all calls are done/the response is acted on, and then return a new list with the info of each response.

I probably don't yet fully understand when to use launch/async, but I've tried to following with both launch (with joinAll), and async (with await).

fun processData(lstInputs: List<String>): List<response> {

    val lstOfReturnData = mutableListOf<response>()

    runBlocking {
        withContext(Dispatchers.IO) {
            val jobs = List(lstInputs.size) {
                launch {
                    lstOfReturnData.add(networkCallToGetData(lstInputs[it]))
                }
            }
            jobs.joinAll()
        }
    }

    return lstofReturnData

What I am expecting to happen, is if my lstInputs is a size of 120, when all jobs are joined, my lstOfReturnData should also have a size of 120.

What actually is happening is inconsitent results. I'll run it once, and I get 118 in my final list, run it again, it's 120, run it again, it's 117, etc. In the networkCallToGetData() method, I am handling any exceptions, to at least return something for every request, regardless if the network call fails.

Can anybody help explain why I am getting inconsistent results, and what I need to do to ensure I am blocking appropriately and all jobs are being joined before moving on?


Solution

  • mutableListOf() creates an ArrayList, which is not thread-safe.
    Try using ConcurrentLinkedQueue instead.

    Also, do you use the stable version of Kotlin/Kotlinx.coroutine (not the old experimental one)? In the stable version, with the introduction of structured concurrency, there is no need to write jobs.joinAll anymore. launch is an extesion function of runBlocking which will launch new coroutines in the scope of the runBlocking and the runBlocking scope will automatically wait for all the launched jobs to finsish. So the code above can be shorten to

    val lstOfReturnData = ConcurrentLinkedQueue<response>()
    runBlocking {
            lstInputs.forEach {
                launch(Dispatches.IO) {
                    lstOfReturnData.add(networkCallToGetData(it))
                }
            }
    }
    return lstOfReturnData