kotlinkotlin-coroutines

How does Continuation work in Kotlin Coroutine?


I am studying about CPS. I was wondering how does it work.

Object createPost(
    Token token,
    Item item,
    Continuation<Post> const){...}
interface Continuation<in T> {
    val context: CoroutineContext
    fun resume(value: T)
    fun resumeWithException(exception: Throwable)
}

People says CPS is just callbacks and nothing more than that.

But

  1. I don't know why interface is used in here as a parameter.
  2. I don't know what does <in T> do in the Continuation interface.
  3. Continuation is a parameter but, what does it do actually inside and how is it be called under the hood?

Solution

  • The continuation is an object that store the state of coroutine.It must also store local variable and place where coroutine was suspended . This is an object that we can use to resume the coroutine by resume() or resumeWith() or resumeWithException() .

    suspendCancellableCoroutine is a function from the kotlinx.coroutines library. Instead, we could use the suspendCoroutine function from Kotlin standard library, which would behave the same and provide continuation object .i will use suspendCancellableCoroutine that suspend the execution of coroutine and also allow the suspension to be cancellable. I will use it later.

    // code 1
    suspend fun getEmployee() {
        println("Start main()")
        var inc: Int = 0
        val id = getID()
        inc++
        println("End main() $inc $id")
    }
    

    In above our suspension point is getID() now continuation store local variables of getEmployees() that will be used after the suspension point (getID()) like in above we need inc because they are used after suspension point .

    There are a few ways in which suspending functions could have been implemented, but the Kotlin team decided on an option called continuation-passing style. This means that continuations (explained in the previous chapter) are passed from function to function as arguments. By convention, a continuation takes the last parameter position. In our code it will look like that

    suspend fun getStudent(rollno: Int): Student?
    suspend fun setStudentRollno(name: String): Unit 
    suspend fun showData(): Unit 
        
    // under hood 
    fun getStudent(rollno: Int, continuation: Continuation<*>): Any?
    fun setStudentRollno(name: String, continuation: Continuation<*>): Any
    fun showData(continuation: Continuation<*>): Any
    

    You might have also noticed that the result type under the hood is different from the originally declared one. It has changed to Any or Any?. Why so? The reason is that a suspending function might be suspended, and so it might not return a declared type. In such a case, it returns a special COROUTINE_SUSPENDED marker, which we will later see in practice. For now, just notice that since getStudent might return Student? or COROUTINE_SUSPENDED (which is of type Any), its result type must be the closest supertype of Student? and Any, so it is Any?. We will discuss later in more detail.

    // code 1 under hood look like  
    fun getEmployee(continuation: Continuation<*>): Any 
    fun getID(continuation: Continuation<*>): Any
    

    The next thing is that this getEmpolyess needs its own continuation in order to remember its state. Let's name it getEmployeeContinuation. when getEmployees() is called it first check continuation that comes from parameter is its own continuation(getEmployeeContinuation) or caller continuation(continuation). if continuation is caller continuation(continuation) it means it is beginning of its body .but if continuation is its own continuation(getEmployeeContinuation) it means it is request to resume from suspension point (getID()).and we will see who will request.

    At the beginning of its body, getEmployee will wrap the continuation (the parameter) of its caller with its own continuation (getEmployeeContinuation).

    val currentContinuation = getEmployeeContinuation(continuation)
    

    This should be done only if the continuation isn't wrapped already. If it is, this is part of the resume process, and we should keep the continuation unchanged(we will see how this will done in code)

    val currentContinuation =
      if (continuation is getEmployeeContinuation) continuation
      else getEmployeeContinuation(continuation)
    

    now let look getID()

    suspend fun getID(): Int {
        var text = "hello world"
        println("before")
        suspendCancellableCoroutine<Int> { getIDContinuation ->
            thread {
                Thread.sleep(1000)
                getIDContinuation.resumeWith(Result.success(Unit))
            }
        }
        text += "hh"
        println("after. $text")
        return 1
    }
    

    basic `s concept :

    inline fun <T> Continuation<T>.resume(value: T): Unit =
        resumeWith(Result.success(value))
    
    inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit = 
        resumeWith(Result.failure(exception))
    

    if result is successfull then resume excution of coroutine as return value of suspension otherwise exception is re thrown right after suspension point.

    getEmployees() function could be started from two places: either from the beginning (in the case of a first call) or from the point after suspension (in the case of resuming from continuation). To identify the current state, we use a field called label. At the start, it is 0, therefore the function will start from the beginning. However, it is set to the next state before each suspension point so that we start from just after the suspension point after a resume.

    The actual code that continuations and suspending functions are compiled to is more complicated as i am giving pseudo code

    getEmployee() : pseudo code

      class  GetEmployeeContinuation( val cont : Continuation<Unit>) : Continuation<Int> {
     override   val   context : CoroutineContext 
                   get() : cont.context 
      var label : Int = 0 //track state 
      var  inc : Int = 0 
      var  result : Result<Any>? = null
       override fun resumeWith(result : Result<Int>) {
       this.result = result 
       val res = 
           try{
             // request to resume 
             val r = getEmployee(this)
            if(r == COROUTINE_SUSPENDED){
             return
            }
           Result.success(r as Unit)
          }catch(ex : Throwable ) { Result.failure(ex)}
         cont.resumeWith(res)
       } 
    }
    
    fun getEmployee( cont : Continuation<*>) : Any {
       val currentContinuation =
      if( cont is  GetEmployeeContinuation ) cont
      else GetEmployeeContinuation(cont) 
     var result: Result<Any>? = currentContinuation.result
    if( currentContinuation.label ==0 ){
     println(“start main()”)
     currentContinuation.inc = 0 
     // set state so we can start execution of coroutine where we left
      currentContinuation.label = 1 
     val res = getID(currentContinuation)
      if( res == COROUTINE_SUSPENDED) return  COROUTINE_SUSPENDED
       result = Result.success(res as Int)
    }
    if(currentContinuation.label == 1){
      val id = result!!.getOrThrow() as Int
      currentContinuation.inc = currentContinuation.inc + 1  
      println(“End main() ${currentContinuation.inc}  $id”)
       return Unit
    }
    }
    

    getID() : pseudo code

    fun getID(cont: Continuation<*>): Any {
        val currentContinuation =
            if (cont is GetIDContinuation) cont
            else GetIDContinuation(cont)
        if (currentContinuation.label == 0) {
            println("before")
            currentContinuation.text = "hello world"
            currentContinuation.label = 1
            if (Thread.sleep(1000) == COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED
        }
        if (currentContinuation.label == 1) {
            currentContinuation.text += "hh"
            println("after ${currentContinuation.text}")
            return 1
        }
        error("Impossible")
    }
    
    class GetIDContinuation(val cont: Continuation<Int>) : Continuation<Unit> {
        override val context: CoroutineContext
            get() = cont.context
    
        var text: String? = null
        var result: Result<Any>? = null
        var label: Int = 0
    
        override fun resumeWith(result: Result<Unit>) {
            this.result = result
            val res =
                try {
                    val r = getID(this)
                    if (r == COROUTINE_SUSPENDED) return
                    Result.success(r as Int)
                } catch (ex: Exception) {
                    Result.failure(ex)
                }
            cont.resumeWith(res)
        }
    }
    

    Let take One scene

    In above getEmployee() is called then it make its own continuation object(GetEmployeeContinuation) and start execution from beginning and before calling getID() by passing its own continuation object(GetEmployeeContinuation) ,it update its state (set object GetEmployeeContinuation.label = 1) so we resume the execution of coroutine where we was suspended and also store local variable in object GetEmployeeContinuation and getID() make its own continuation object(GetIDContinuation) and start execution from beginning and it comes last suspension point(Thread.sleep())before calling last suspension point getID() also set its label to 1 and then getID() return COROTUINE_SUSPENDED and in getEmployee() the res = getID(currentContinuation ) --> res = COROUTINE_SUSPENDED so getEmployee() also return COROUTINE_SUSPENDED and whole call stack fold so the thread is free by returning from the function and can do something else .

    after 1 second this line will run
    getIDContinuation.resumeWith(Result.success(Unit)) and resumeWith() of class GetIDContinuation() will run by passing result as Unit that is why getIDContinuation has continuation(Unit)

    In resumeWith() of GetEmployeeContinuation

    The cont variable is of type Continuation(Int), which means it expects a value of type Int when it is resumed.when continuatin resume the execution of coroutine then resumeWith() method is called with a Result parameter. When the coroutine is resumed, the result of the continuation is set to the value(Result parameter) passed to resumeWith(). In this case, the value is an Int, so the result will have a data type of Any.

    now we have set getIDContinuation.label is equal to 1 before calling Thread.sleep we start execution of coroutine where it was suspended so getID() is call again in resumeWith() of class GetIDContinuation by passing its own continuation object (getIDContinuation)(means resume ) to get value of r and getID() completes its execution and return 1 so variable r is 1 and res is 1 so now this line will run cont.resumeWith(res)

    here cont = GetEmployeeContinuation , so resumeWith(1) of class GetEmployeeContinuation will be called and result of GetEmplyeeContinuation will be update to 1. getEmployeeContinuation.label =1 was set before calling getID() .so the execution of coroutine was starting where we have left . In resumeWith() of GetEmployeeContinuation we resume the execution of getEmployee() by calling it by passing its own continuaion object (GetEmployeeContinuation) and completed its execution and then it return unit that will be values of res and GetEmplyeeContinuation resume the excution of its caller when this line will execute cont.resumeWith(res) and process continuous in entire call stack .

    actually i get link which help me to answer coroutine under hood