I have a Kotlin JVM server application using coroutines and I need to put a cache in front of a non-blocking network call. I figure I can use a Caffeine AsyncLoadingCache
to get the non-blocking cache behaviour I need. The AsyncCacheLoader
interface I would need to implement uses CompletableFuture
. Meanwhile, the method I want to call to load the cache entries is a suspend
function.
I can bridge the gap like this:
abstract class SuspendingCacheLoader<K, V>: AsyncCacheLoader<K, V> {
abstract suspend fun load(key: K): V
final override fun asyncLoad(key: K, executor: Executor): CompletableFuture<V> {
return GlobalScope.async(executor.asCoroutineDispatcher()) {
load(key)
}.asCompletableFuture()
}
}
This will run the load
function on the provided Executor
(by default, the ForkJoinPool
), which from the point of view of Caffeine is the correct behaviour.
However, I know that I should try to avoid using GlobalScope to launch coroutines.
I considered having my SuspendingCacheLoader
implement CoroutineScope
and manage its own coroutine context. But CoroutineScope
is intended to be implemented by objects with a managed lifecycle. Neither the cache nor the AsyncCacheLoader
has any lifecycle hooks. The cache owns the Executor
and the CompletableFuture
instances, so it already controls the lifecycle of the loading tasks that way. I can't see that having the tasks be owned by a coroutine context would add anything, and I'm worried that I wouldn't be able to correctly close the coroutine context after the cache stopped being used.
Writing my own asynchronous caching mechanism would be prohibitively difficult, so I'd like to integrate with the Caffeine implementation if I can.
Is using GlobalScope
the right approach to implement AsyncCacheLoader
, or is there a better solution?
After some thought I've come up with a much simpler solution that I think uses coroutines more idiomatically.
The approach works by using AsyncCache.get(key, mappingFunction)
, instead of implementing an AsyncCacheLoader
. However, it ignores the Executor
that the cache is configured to use, following the advice of some of the other answers here.
class SuspendingCache<K, V>(private val asyncCache: AsyncCache<K, V>) {
suspend fun get(key: K): V = supervisorScope {
getAsync(key).await()
}
private fun CoroutineScope.getAsync(key: K) = asyncCache.get(key) { k, _ ->
future {
loadValue(k)
}
}
private suspend fun loadValue(key: K): V = TODO("Load the value")
}
Note that this depends on kotlinx-coroutines-jdk8
for the future
coroutine builder and the await()
function.
I think ignoring the Executor
is probably the right choice. As @Kiskae points out, the cache will use the ForkJoinPool
by default. Choosing to use that rather than the default coroutine dispatcher is probably not useful. However, it would be easy to use it if we wanted to, by changing the getAsync
function:
private fun CoroutineScope.getAsync(key: K) = asyncCache.get(key) { k, executor ->
future(executor.asCoroutineDispatcher()) {
loadValue(k)
}
}