In our reactive application (Spring WebFlux, project Reactor) we carry important logging information within the reactive context. The problem is when we use the Caffeine AsyncCache
, the context is not automatically transferred between the calling Mono
and the caffeine cache Future
.
I'd like to share our solution as I think some others might have the same need.
The solution is in the code, with added comments
This is the main testing class:
import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import java.util.Map;
class ContextFromMonoToCaffeineCache {
@Test
void context() {
final UserService userService = new UserService();
// Call the user service 4 times, each time with a different context.
// Fetch the user with ID=1 twice, using the method which loses the context.
// The 1st fetch is from the database, the 2nd is already from the cache.
userService.getUserWithoutContext(1)
.delayUntil(this::printUser)
.contextWrite(context -> context.put("myContext", "FIRST"))
.block();
userService.getUserWithoutContext(1)
.delayUntil(this::printUser)
.contextWrite(context -> context.put("myContext", "SECOND"))
.block();
// Fetch the user with ID=2 twice, using the context preserving fetch.
// The 1st fetch is from the database, the 2nd is already from the cache.
userService.getUserWithContext(2)
.delayUntil(this::printUser)
.contextWrite(context -> context.put("myContext", "THIRD"))
.block();
userService.getUserWithContext(2)
.delayUntil(this::printUser)
.contextWrite(context -> context.put("myContext", "FORTH"))
.block();
}
private Mono<Object> printUser(User user) {
return Mono.deferContextual(context -> {
System.out.printf("Found %s, with myContext: %s %n", user, context.get("myContext"));
return Mono.empty();
});
}
}
The output is:
Fetched for ID: 1, with myContext: N/A
Found User[id=1, name=Alex], with myContext: FIRST
Found User[id=1, name=Alex], with myContext: SECOND
Fetched for ID: 2, with myContext: THIRD
Found User[id=2, name=Betty], with myContext: THIRD
Found User[id=2, name=Betty], with myContext: FORTH
As you can see, the cache works as expected. Each ID is fetched from the "database" only once.
ID=1 was fetched without the context handling,
ID=2 has the context properly handled.
The UserService
class uses Caffeine AsyncCache
.
cache.get()
has 2 parameters:
class UserService {
private final UserRepository userRepository = new UserRepository();
private final AsyncCache<Integer, User> cache = Caffeine.newBuilder().buildAsync();
public Mono<User> getUserWithoutContext(Integer userKey) {
return Mono.deferContextual(contextView -> Mono.fromCompletionStage(cache.get(userKey,
// Executed when the key is not in the cache yet:
(key, executor) -> userRepository.fetchUser(key)
.toFuture()
)));
}
public Mono<User> getUserWithContext(Integer userKey) {
return Mono.deferContextual(contextView -> Mono.fromCompletionStage(cache.get(userKey,
// Executed when the key is not in the cache yet:
(key, executor) -> userRepository.fetchUser(key)
// Copy the context.
// Notice that the call of contextWrite() must be AFTER the call of fetchUser(),
// as it is propagated upstream and not downstream.
.contextWrite(context -> context.put("myContext", contextView.get("myContext")))
.toFuture()
)));
}
}
UserRepository
is just a simulated database repository.
In the fetchUser()
method we inspect the context and print it out to demonstrate if it has or has not been properly propagated.
class UserRepository {
// Simulated database table
final private Map<Integer, User> users = Map.of(
1, new User(1, "Alex"),
2, new User(2, "Betty")
);
public Mono<User> fetchUser(Integer userKey) {
return Mono.just(userKey)
// Let's check what is in the context
.transformDeferredContextual((integerMono, contextView) -> {
System.out.printf("Fetched for ID: %s, with myContext: %s %n",
userKey, contextView.getOrDefault("myContext", "N/A"));
return integerMono;
})
// Simulate the database fetch
.mapNotNull(users::get);
}
}
User
is our business object:
record User(Integer id, String name) {}