multithreadingkotlinkotlin-coroutinescoroutinescopekotlin-coroutine-channel

Kotlin Coroutine Channel not queueing


Use case: The function processData() can be called by many coroutines in parallel, but they need to be processed one after the other like a queue and should not run in parallel.

I used channels to achieve this but still the function is executed in parallel. How can I ensure that processData() is run only after an ongoing execution of the same function has completed ? What am I doing wrong here ?

My Implementation :

class MyClass {

    private var channel: Channel<Long> = Channel()

    //this is public function other classes can invoke in parallel, which inturn invokes processData.
    suspend fun call() {
        coroutineScope {
            launch {
                val timeStamp = System.currentTimeMillis()
                Log.i("testing", "send :$timeStamp")
                channel.send(timeStamp)
            }
            processData(channel.receive())
        }
    }


    private suspend fun processData(timeStamp : Long) {
        Log.i("testing", "processData: start :$timeStamp")
        delay(5000L)
        Log.i("testing", "processData: end :$timeStamp")
        return
    }
}

Logcat logs have been cleaned up for readability including values of System.currentTimeMillis() has been replaced with 1,2,3,4 & 5.

Expected Logcat :(Not accurate but just general idea)

send :1
processData: start :1
send :2
send :3
send :4
send :5
processData: end :1
processData: start :2
processData: end :2
processData: start :3
processData: end :3
processData: start :4
processData: end :4
processData: start :5
processData: end :5

Current Logcat :

send :1
processData: start :1

send :2
processData: start :2

send :3
processData: start :3

send :4
processData: start :4

send :5
processData: start :5

processData: end :1
processData: end :2
processData: end :3
processData: end :4
processData: end :5

Solution

  • Try using Mutex:

    import kotlinx.coroutines.sync.Mutex
    
    class MyClass {
         val mutex = Mutex()
         ...
         private suspend fun processData(index: Int) {
             mutex.withLock {
                 //do your non parallel stuff
             }
         }
    }
    

    Nice tutorial: https://medium.com/mobile-app-development-publication/mutex-for-coroutines-5f4a4ca60763