kotlincachingkotlin-coroutinescaffeine

How do I use an async cache with Kotlin coroutines?


I have a Kotlin JVM server application using coroutines and I need to put a cache in front of a non-blocking network call. I figure I can use a Caffeine AsyncLoadingCache to get the non-blocking cache behaviour I need. The AsyncCacheLoader interface I would need to implement uses CompletableFuture. Meanwhile, the method I want to call to load the cache entries is a suspend function.

I can bridge the gap like this:

abstract class SuspendingCacheLoader<K, V>: AsyncCacheLoader<K, V> {
    abstract suspend fun load(key: K): V

    final override fun asyncLoad(key: K, executor: Executor): CompletableFuture<V> {
        return GlobalScope.async(executor.asCoroutineDispatcher()) {
            load(key)
        }.asCompletableFuture()
    }
}

This will run the load function on the provided Executor (by default, the ForkJoinPool), which from the point of view of Caffeine is the correct behaviour.

However, I know that I should try to avoid using GlobalScope to launch coroutines.

I considered having my SuspendingCacheLoader implement CoroutineScope and manage its own coroutine context. But CoroutineScope is intended to be implemented by objects with a managed lifecycle. Neither the cache nor the AsyncCacheLoader has any lifecycle hooks. The cache owns the Executor and the CompletableFuture instances, so it already controls the lifecycle of the loading tasks that way. I can't see that having the tasks be owned by a coroutine context would add anything, and I'm worried that I wouldn't be able to correctly close the coroutine context after the cache stopped being used.

Writing my own asynchronous caching mechanism would be prohibitively difficult, so I'd like to integrate with the Caffeine implementation if I can.

Is using GlobalScope the right approach to implement AsyncCacheLoader, or is there a better solution?


Solution

  • After some thought I've come up with a much simpler solution that I think uses coroutines more idiomatically.

    The approach works by using AsyncCache.get(key, mappingFunction), instead of implementing an AsyncCacheLoader. However, it ignores the Executor that the cache is configured to use, following the advice of some of the other answers here.

    class SuspendingCache<K, V>(private val asyncCache: AsyncCache<K, V>) {
        suspend fun get(key: K): V = supervisorScope {
            getAsync(key).await()
        }
    
        private fun CoroutineScope.getAsync(key: K) = asyncCache.get(key) { k, _ ->
            future { 
                loadValue(k) 
            }
        }
    
        private suspend fun loadValue(key: K): V = TODO("Load the value")
    }
    

    Note that this depends on kotlinx-coroutines-jdk8 for the future coroutine builder and the await() function.

    I think ignoring the Executor is probably the right choice. As @Kiskae points out, the cache will use the ForkJoinPool by default. Choosing to use that rather than the default coroutine dispatcher is probably not useful. However, it would be easy to use it if we wanted to, by changing the getAsync function:

    private fun CoroutineScope.getAsync(key: K) = asyncCache.get(key) { k, executor ->
        future(executor.asCoroutineDispatcher()) { 
            loadValue(k) 
        }
    }