androidkotlinkotlin-coroutinescouchbase-lite

How to get informed when a coroutine channel finish a job


I have an android app that uses CouchBase lite, I'm trying to save a document and get the acknowledgement using coroutin channel, the reason why I use a channel is to make sure every operation is done on the same scope

here is my try based on the selected answer here How to properly have a queue of pending operations using Kotlin Coroutines?

object DatabaseQueue {
    private val scope = CoroutineScope(IOCoroutineScope)
    private val queue = Channel<Job>(Channel.UNLIMITED)

    init {
        scope.launch(Dispatchers.Default) {
            for (job in queue) job.join()
        }
    }

    fun submit(
        context: CoroutineContext = EmptyCoroutineContext,
        block: suspend CoroutineScope.() -> Unit
    ) {
        val job = scope.launch(context, CoroutineStart.LAZY, block)
        queue.trySendBlocking(job)
    }

    fun submitAsync(
        context: CoroutineContext = EmptyCoroutineContext,
        id: String,
        database: Database
    ): Deferred<Document?> {
        val job = scope.async(context, CoroutineStart.LAZY) {
            database.getDocument(id)
        }
        queue.trySendBlocking(job)
        return job
    }

    fun cancel() {
        queue.cancel()
        scope.cancel()
    }
}
fun Database.saveDocument(document: MutableDocument) {
    DatabaseQueue.submit {
        Timber.tag("quechk").d("saving :: ${document.id}")
        this@saveDocument.save(document)
    }
}

fun Database.getDocumentQ(id: String): Document? {
    return runBlocking {
        DatabaseQueue.submitAsync(id = id, database = this@getDocumentQ).also {
            Timber.tag("quechk").d("getting :: $id")
        }.await()
    }
}

my issue here is that when I have many db operations to write and read the reads are performing faster than the writes which gives me a null results, so,what I need to know is


Solution

  • By modifying the original solution you actually made it work improperly. The whole idea was to create an inactive coroutine for each submitted block of code and then start executing these coroutines one by one. In your case you exposed a Deferred to a caller, so the caller is able to start executing a coroutine and as a result, coroutines no longer run sequentially, but concurrently.

    The easiest way to fix this while keeping almost the same code would be to introduce another Deferred, which is not directly tight to the queued coroutine:

    fun submitAsync(
        context: CoroutineContext = EmptyCoroutineContext,
        id: String,
        database: Database
    ): Deferred<Document?> {
        val ret = CompletableDeferred<Document?>()
        val job = scope.launch(context, CoroutineStart.LAZY) {
            ret.completeWith(runCatching { database.getDocument(id) })
        }
        queue.trySendBlocking(job)
        return ret
    }
    

    However, depending on your case it may be an overkill. For example, if you don't need to guarantee a strict FIFO ordering, a simple Mutex would be enough. Also, please note that classic approach of returning futures/deferreds only to await on them is an anti-pattern in coroutines. We should simply use a suspend function and call it directly.