kotlinconcurrencyarchitecturekotlin-coroutines

Building a suspending resource pool using Kotlin Coroutines


I'm exploring how to build a resource pool that suspends until a resource is available. This pool can be anything, from slots to allow API calls to socket connections to image processors or in this sample code, integers which could be available service counters:

val pool = Pool(1, 2, 3)


repeat(100) {
    launch {
        pool.borrow {
            println(it)
            delay(1000)
        }
    }
}

The borrow function grabs one item from the resource pool and returns it once it's done.

What I've done so far is this:

class Pool(vararg resource: Int) {
    private val mutex = Mutex()
    private val list = mutableListOf(*resource.toTypedArray())
    private var available: CompletableDeferred<Boolean>? = null

    suspend fun add(value: Int) {
        mutex.withLock {
            list.add(value)
            available?.complete(true)
        }
    }

    suspend fun rem(): Int {
        mutex.withLock {
            if (list.size == 1) {
                available = CompletableDeferred()
            } else {
                available?.await()
            }
            return list.removeLast()
        }
    }

    suspend fun borrow(handler: suspend (Int) -> Unit) {
        val borrowed = rem()
        try {
            handler(borrowed)
        } finally {
            add(borrowed)
        }
    }
}

Both add and remove runs inside mutex.withLock { ... } to ensure we don't get a concurrent modification exception when two threads are trying to modify the same list.

Initially, available is null, so any available?.await() will be skipped due to the null check. Once you remove the last item from the list (list.size == 1), available is set to a CompletableDeferred which means available?.await() will now suspend until available?.complete(true) is called.

Once more items are added, available?.complete(true) is called again which will stop available?.await() from suspending. If you try to remove an item, this code will deadlock, available?.await() will suspend which means the mutex never exits preventing add from being called again to allow the rem from unsuspending again.

If I move that available?.await() before the mutex, two threads will try to remove an item from a list that might only have one item in it and the second thread will hit a List is emptyerror.

What is the correct way to implement such a suspending resource pool?


Solution

  • I think this would be a lot easier to build using a Channel. You don't even need to make add suspending because you can use trySend from any thread safely when the Channel has an unlimited capacity.

    I also suggest making borrow inline and removing suspend from its function parameter. This avoids a function wrapper allocation when you use it. Since it's inline, you'll still be able to call suspend functions in the lambda you pass to it even if it's not a suspend lambda.

    class Pool<T>(vararg initialResources: T) {
        private val channel = Channel<T>(
          capacity = Channel.UNLIMITED,
          onUndeliveredElement = { resource -> 
            // Race condition bug: a receiver was canceled before it could 
            // receive this value. Manually re-add it back to the pool.
            add(resource)
          },
        )
        
        init {
            for (res in initialResources) {
                channel.trySend(res)
            }
        }
    
        fun add(value: T) {
           channel.trySend(value)
        }
    
        suspend fun rem(): T = channel.receive()
    
        suspend inline fun borrow(handler: (T) -> Unit) {
            val borrowed = rem()
            try {
                handler(borrowed)
            } finally {
                add(borrowed)
            }
        }
    }