kotlinkotlin-coroutinesblocking

How to pause and resume IO tasks?


A function that does a task on IO thread.

eg:

suspend fun fx() = withContext(Dispatchers.IO) {
    val p = Path("""D:\Games\Eg.7z""")
    val p2 = Path("""D:\New folder\Eg.7z""")
    p.copyTo(p2) // takes a while to complete
    Unit
}

I want to pause the IO operation from the Dispatchers.Main keyPressed event, and resume it at will.

How do I pause and resume a coroutine task?

Update: From the answer, it seems it's out of scope of Coroutine. So, how else we can pause & resume IO tasks if not using coroutines?


Solution

  • With more reading and testing, I've managed to make a simple version. Edit for folder and specific use cases.

    
    enum class TaskSwitch {
        Running, Paused, Cancelled,
    }
    
    fun incrementalCopy(
        source: Path,
        destination: Path,
        switch: MutableStateFlow<TaskSwitch>,
        deleteIncomplete: Boolean = true,
        bufferSize: Int = DEFAULT_BUFFER_SIZE
    ) = flow {
    
        var shouldDelete = false
    
        var bytesCopied = 0L
        source.inputStream().use { inputStream ->
            destination.outputStream().use { outputStream ->
    
                val buffer = ByteArray(bufferSize)
                var bytes = inputStream.read(buffer)
                while (bytes >= 0) {
                    when (val taskSwitch = switch.value) {
                        TaskSwitch.Running -> {
                            outputStream.write(buffer, 0, bytes)
                            bytesCopied += bytes
                            emit(bytesCopied)
    
                            bytes = inputStream.read(buffer)
                        }
    
                        TaskSwitch.Paused -> {
                            //Suspends until it's not in Paused anymore
                            switch.first {
                                it != taskSwitch
                            }
                        }
    
                        TaskSwitch.Cancelled -> {
                            shouldDelete = deleteIncomplete
                            break
                        }
                    }
                    yield()
                }
            }
        }
    
        if (shouldDelete) {
            destination.deleteIfExists()
        }
    
    }.flowOn(Dispatchers.IO)
    
    
    fun main(): Unit = runBlocking {
        val sourceFile = Path("D:\\Games\\Age-Of-Empires-II-HD.rar")
        val targetFile = Path("D:\\New folder\\A\\B\\Age-Of-Empires-II-HD.rar")
        targetFile.parent.createDirectories()
        val switch = MutableStateFlow(TaskSwitch.Running)
        launch {
            incrementalCopy(sourceFile, targetFile, switch).collectLatest {
                println(it)
            }
            println("Finished!")
        }
    
        delay(5000L)
        switch.update {
            TaskSwitch.Paused
        }
        println("Paused")
        delay(10000L)
        println("Running")
        switch.update {
            TaskSwitch.Running
        }
    
    //    delay(1000L)
    //    switch.update {
    //        TaskSwitch.Cancelled
    //    }
    //    println("Canceled")
    }