I need to organize custom tracing and logging. Logging logback (Slf4j).
I have a service that needs to read fields from the header and add them to the correlation (before switching to spring boot 3, I used MDC to distribute data). Plus I also added custom fields to MDC for logging.
After switching to spring boot 3 (from sleuth to micrometer), I don’t understand how I can manage data in MDC.
Can you tell me how to correctly add custom fields to the correlation? How to fill out Mdc when using a micrometer. How to transfer everything from the parent thread to a new one, and clean it up after completion?
Here's my example:
logging:
pattern:
console: "%clr(%d){faint} %clr(%5p) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%X{testRemoteBaggage}-%X{testLocalBaggage}]){magenta} %clr([%40.40t]){faint} %clr(${LOG_CORRELATION_PATTERN:-}){faint}%clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n%wEx"
management:
tracing:
sampling:
probability: 1.0
baggage:
correlation:
fields: testRemoteBaggage, testLocalBaggage
remote-fields: testRemoteBaggage
local-fields: testLocalBaggage
spring:
threads:
virtual:
enabled: true
@EnableAsync
@Configuration(proxyBeanMethods = false)
public class DummyJsonTracingConfiguration {
@Bean
public AsyncTaskExecutor applicationTaskExecutor() {
return new TaskExecutorAdapter(Executors.newVirtualThreadPerTaskExecutor());
}
@Bean
public TomcatProtocolHandlerCustomizer<?> protocolHandlerVirtualThreadExecutorCustomizer() {
return protocolHandler -> {
protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
};
}
}
@Component
@Slf4j
@RequiredArgsConstructor
public class AddBaggageFilter extends OncePerRequestFilter implements OrderedFilter {
private static final String fieldRemote = "testRemoteBaggage";
private static final String fieldLocal = "testLocalBaggage";
private final Tracer tracer;
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
String baggagefieldRemote = request.getHeader(fieldRemote);
if (StringUtils.isEmpty(baggagefieldRemote)) {
baggagefieldRemote = "autoValue"+fieldRemote;
}
String baggagefieldLocal = request.getHeader(fieldLocal);
if (StringUtils.isEmpty(baggagefieldLocal)) {
baggagefieldLocal = "autoValue"+fieldLocal;
}
log.warn("baggagefieldRemote value : [{}]", baggagefieldRemote);
log.warn("baggagefieldLocal value : [{}]", baggagefieldLocal);
try (BaggageInScope baggageRemote = tracer.createBaggageInScope(fieldRemote, baggagefieldRemote);
BaggageInScope baggageLocal = tracer.createBaggageInScope(fieldLocal, baggagefieldLocal)) {
log.warn("baggagefieldRemote was created!");
filterChain.doFilter(request, response);
}
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE + 2;
}
}
But in the logs I see that the correlation is displayed only for the main thread.
2024-07-14 14:05:28,274 WARN 23716 --- [test1-] [ tomcat-handler-1] [6693bf08c4ecb0e29507c9dbc43570fe-9507c9dbc43570fe] b.g.t.rest.AddBaggageFilter : baggagefieldRemote value : [test1]
2024-07-14 14:05:28,275 WARN 23716 --- [test1-] [ tomcat-handler-1] [6693bf08c4ecb0e29507c9dbc43570fe-9507c9dbc43570fe] b.g.t.rest.AddBaggageFilter : baggagefieldLocal value : [autoValuetestLocalBaggage]
2024-07-14 14:05:28,276 WARN 23716 --- [test1-autoValuetestLocalBaggage] [ tomcat-handler-1] [6693bf08c4ecb0e29507c9dbc43570fe-9507c9dbc43570fe] b.g.t.rest.AddBaggageFilter : baggagefieldRemote was created!
2024-07-14 14:05:28,279 DEBUG 23716 --- [test1-autoValuetestLocalBaggage] [ tomcat-handler-1] [6693bf08c4ecb0e29507c9dbc43570fe-9507c9dbc43570fe] o.s.web.servlet.DispatcherServlet : POST "/categoryInfo", parameters={}
2024-07-14 14:05:28,285 DEBUG 23716 --- [test1-autoValuetestLocalBaggage] [ tomcat-handler-1] [6693bf08c4ecb0e29507c9dbc43570fe-9507c9dbc43570fe] s.w.s.m.m.a.RequestMappingHandlerMapping : Mapped to by.gvu.testtracingrestclient.controller.SimpleController#getListProducts()
2024-07-14 14:05:29,128 DEBUG 23716 --- [test1-autoValuetestLocalBaggage] [ tomcat-handler-1] [6693bf08c4ecb0e29507c9dbc43570fe-9507c9dbc43570fe] o.s.web.client.DefaultRestClient : Reading to [java.util.List<java.lang.String>]
2024-07-14 14:05:29,377 DEBUG 23716 --- [-] [ ForkJoinPool.commonPool-worker-9] [ ] o.s.web.client.DefaultRestClient : Reading to [java.util.List<by.gvu.testtracingrestclient.model.Product>]
2024-07-14 14:05:29,689 DEBUG 23716 --- [-] [ ForkJoinPool.commonPool-worker-10] [ ] o.s.web.client.DefaultRestClient : Reading to [java.util.List<by.gvu.testtracingrestclient.model.Product>]
2024-07-14 14:05:29,734 DEBUG 23716 --- [-] [ ForkJoinPool.commonPool-worker-4] [ ] o.s.web.client.DefaultRestClient : Reading to [java.util.List<by.gvu.testtracingrestclient.model.Product>]
2024-07-14 14:05:29,734 DEBUG 23716 --- [-] [ ForkJoinPool.commonPool-worker-12] [ ] o.s.web.client.DefaultRestClient : Reading to [java.util.List<by.gvu.testtracingrestclient.model.Product>]
UPDATED
The problem was not related to virtual threads.
As it follows from the MRE the OP posted the problem is with execution of RestClient
-powered requests on separate threads, in the OP case threads, managed by parallelStream
/ForkJoinPool
.
List<String> categories = ...
categories.parallelStream().map(category -> {
List<Product> productList = restClient.post()
.uri(uriBuilder -> uriBuilder.path("...").build(category))
.retrieve()
...
RestClient
does integrate with Micrometer Observation: upon execution of a request it starts an Observation
and that one retrieves parent Observation
form ObservationRegistry
. ObservationRegistry
, in turn, stores current Observation
(Observation.Scope
) in its ThreadLocal
. When RestClient
is executed on a container (Tomcat) thread, ServerHttpObservationFilter
, configured by Spring Boot, takes care of setting current Observation
to the registry, but when a thread, managed by parallelStream
/ForkJoinPool
, is about to execute RestClient
, this thread does not have an access to ThreadLocal
of Tomcat thread, and therefore, to initial (parent) Observation
.
This is a well-known Context Propagation problem.
Micrometer framework anticipated this kind of issues and provisioned special library for it: Context Propagation, its purpose is to
...assists with context propagation across different types of context mechanisms, such as
ThreadLocal
...
This library provides a special kind of ExecutorService
, ContextExecutorService
, which allows to
propagate context to the task when it is executed.
ContextExecutorService
wraps other Executor
in its static wrap
method, let's use as delegatee same ForkJoinPool
as used in parellelStream
:
@Configuration
public class PropagationConfiguration {
@Bean
public ExecutorService propagationExecutorService() {
return ContextExecutorService.wrap(Executors.newWorkStealingPool(),
(Supplier<ContextSnapshot>) () -> ContextSnapshotFactory.builder().build().captureAll());
}
}
Then the instance of this ExecutorService
could be injected into a class that issues the requests, for example, a Service:
@Autowired
private ExecutorService contextExecutorService;
and then use this ExecutorService
to submit requests in parallel.
Let's now apply this approach to the OP's snippet, staying in the same streams paradigm:
List<String> categories = ...
return categories.stream().map( (category) -> {
return contextExecutorService.submit( () -> {
return new CategoryDetails(category, restClient.post()
.uri(uriBuilder -> uriBuilder.path("...").build(category))
.retrieve()
.body(new ParameterizedTypeReference<List<Product>>() {}), null);
});
})
.collect(Collectors.toList())
.stream()
.map( (future) -> {
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toList());
Note that first collect
and subsequent stream()
are necessary to allow parallel execution of CategoryDetails
retrieval.
Obviously, this is less elegant than parellelStream
approach; it seems, however, that there is no reliable way to use custom Executor
for parellelStream
s.
This approach, however, correctly propagates Observation Context to ExecutorService
-managed threads and further to the server that handles the requests. It might also be preferable to parellelStream
s because according to Parallel streams in Java: Benchmarking and performance considerations blog they
might not provide significant benefits for the I/O operations
It still might be possible to use the OP's original parallelStream
approach, but this solution, which involves "manual" propagation of Observation Scope to the parallelStream
-controlled threads via MDC Context, is not recommended as less scalable and MDC-coupled:
@Autowired
private ObservationRegistry registry;
...
final Scope currentScope = registry.getCurrentObservationScope();
final Map<String, String> mdcContext = MDC.getCopyOfContextMap();
categories.parallelStream().map(category -> {
final Scope oldScope = observationRegistry.getCurrentObservationScope();
try {
MDC.setContextMap(mdcContext);
registry.setCurrentObservationScope(currentScope);
restClient.post().uri(uriBuilder -> uriBuilder.path("...")
.build(category))
.retrieve()
...
} finally {
MDC.clear();
registry.setCurrentObservationScope(oldScope);
}
}