javaspring-bootmicrometermdc

Spring Boot 3 micrometer tracing in MDC


I need to organize custom tracing and logging. Logging logback (Slf4j).

I have a service that needs to read fields from the header and add them to the correlation (before switching to spring boot 3, I used MDC to distribute data). Plus I also added custom fields to MDC for logging.

After switching to spring boot 3 (from sleuth to micrometer), I don’t understand how I can manage data in MDC.

Can you tell me how to correctly add custom fields to the correlation? How to fill out Mdc when using a micrometer. How to transfer everything from the parent thread to a new one, and clean it up after completion?

Here's my example:

  1. Set up a logging pattern:
logging:
  pattern:
    console: "%clr(%d){faint} %clr(%5p) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%X{testRemoteBaggage}-%X{testLocalBaggage}]){magenta} %clr([%40.40t]){faint} %clr(${LOG_CORRELATION_PATTERN:-}){faint}%clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n%wEx"
  1. Configured adding custom fields to the correlation (the first field should be sent with requests, the second should not):
management:
  tracing:
    sampling:
      probability: 1.0
    baggage:
      correlation:
        fields: testRemoteBaggage, testLocalBaggage
      remote-fields: testRemoteBaggage
      local-fields: testLocalBaggage
  1. Configured a task executor to work asynchronously with virtual threads:
spring:
  threads:
    virtual:
      enabled: true
@EnableAsync
@Configuration(proxyBeanMethods = false)
public class DummyJsonTracingConfiguration {

    @Bean
    public AsyncTaskExecutor applicationTaskExecutor() {
        return new TaskExecutorAdapter(Executors.newVirtualThreadPerTaskExecutor());
    }

    @Bean
    public TomcatProtocolHandlerCustomizer<?> protocolHandlerVirtualThreadExecutorCustomizer() {
        return protocolHandler -> {
            protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
        };
    }
}
  1. Added a filter that adds custom fields to the correlation:
@Component
@Slf4j
@RequiredArgsConstructor
public class AddBaggageFilter extends OncePerRequestFilter implements OrderedFilter {
    private static final String fieldRemote = "testRemoteBaggage";
    private static final String fieldLocal = "testLocalBaggage";

    private final Tracer tracer;

    @Override
    protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {

        String baggagefieldRemote = request.getHeader(fieldRemote);
        if (StringUtils.isEmpty(baggagefieldRemote)) {
            baggagefieldRemote = "autoValue"+fieldRemote;
        }
        String baggagefieldLocal = request.getHeader(fieldLocal);
        if (StringUtils.isEmpty(baggagefieldLocal)) {
            baggagefieldLocal = "autoValue"+fieldLocal;
        }
        log.warn("baggagefieldRemote value : [{}]", baggagefieldRemote);
        log.warn("baggagefieldLocal value : [{}]", baggagefieldLocal);

        try (BaggageInScope baggageRemote = tracer.createBaggageInScope(fieldRemote, baggagefieldRemote);
             BaggageInScope baggageLocal = tracer.createBaggageInScope(fieldLocal, baggagefieldLocal)) {
            log.warn("baggagefieldRemote was created!");
            filterChain.doFilter(request, response);
        }
    }

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE + 2;
    }
}

But in the logs I see that the correlation is displayed only for the main thread.

2024-07-14 14:05:28,274  WARN 23716 --- [test1-] [                        tomcat-handler-1] [6693bf08c4ecb0e29507c9dbc43570fe-9507c9dbc43570fe] b.g.t.rest.AddBaggageFilter              : baggagefieldRemote value : [test1]
2024-07-14 14:05:28,275  WARN 23716 --- [test1-] [                        tomcat-handler-1] [6693bf08c4ecb0e29507c9dbc43570fe-9507c9dbc43570fe] b.g.t.rest.AddBaggageFilter              : baggagefieldLocal value : [autoValuetestLocalBaggage]
2024-07-14 14:05:28,276  WARN 23716 --- [test1-autoValuetestLocalBaggage] [                        tomcat-handler-1] [6693bf08c4ecb0e29507c9dbc43570fe-9507c9dbc43570fe] b.g.t.rest.AddBaggageFilter              : baggagefieldRemote was created!
2024-07-14 14:05:28,279 DEBUG 23716 --- [test1-autoValuetestLocalBaggage] [                        tomcat-handler-1] [6693bf08c4ecb0e29507c9dbc43570fe-9507c9dbc43570fe] o.s.web.servlet.DispatcherServlet        : POST "/categoryInfo", parameters={}
2024-07-14 14:05:28,285 DEBUG 23716 --- [test1-autoValuetestLocalBaggage] [                        tomcat-handler-1] [6693bf08c4ecb0e29507c9dbc43570fe-9507c9dbc43570fe] s.w.s.m.m.a.RequestMappingHandlerMapping : Mapped to by.gvu.testtracingrestclient.controller.SimpleController#getListProducts()
2024-07-14 14:05:29,128 DEBUG 23716 --- [test1-autoValuetestLocalBaggage] [                        tomcat-handler-1] [6693bf08c4ecb0e29507c9dbc43570fe-9507c9dbc43570fe] o.s.web.client.DefaultRestClient         : Reading to [java.util.List<java.lang.String>]
2024-07-14 14:05:29,377 DEBUG 23716 --- [-] [        ForkJoinPool.commonPool-worker-9] [                                                 ] o.s.web.client.DefaultRestClient         : Reading to [java.util.List<by.gvu.testtracingrestclient.model.Product>]
2024-07-14 14:05:29,689 DEBUG 23716 --- [-] [       ForkJoinPool.commonPool-worker-10] [                                                 ] o.s.web.client.DefaultRestClient         : Reading to [java.util.List<by.gvu.testtracingrestclient.model.Product>]
2024-07-14 14:05:29,734 DEBUG 23716 --- [-] [        ForkJoinPool.commonPool-worker-4] [                                                 ] o.s.web.client.DefaultRestClient         : Reading to [java.util.List<by.gvu.testtracingrestclient.model.Product>]
2024-07-14 14:05:29,734 DEBUG 23716 --- [-] [       ForkJoinPool.commonPool-worker-12] [                                                 ] o.s.web.client.DefaultRestClient         : Reading to [java.util.List<by.gvu.testtracingrestclient.model.Product>]

UPDATED

The problem was not related to virtual threads.


Solution

  • As it follows from the MRE the OP posted the problem is with execution of RestClient-powered requests on separate threads, in the OP case threads, managed by parallelStream/ForkJoinPool.

    List<String> categories = ...
    
    categories.parallelStream().map(category -> {
        List<Product> productList = restClient.post()
                            .uri(uriBuilder -> uriBuilder.path("...").build(category))
                            .retrieve()
    ...
    

    RestClient does integrate with Micrometer Observation: upon execution of a request it starts an Observation and that one retrieves parent Observation form ObservationRegistry. ObservationRegistry, in turn, stores current Observation (Observation.Scope) in its ThreadLocal. When RestClient is executed on a container (Tomcat) thread, ServerHttpObservationFilter , configured by Spring Boot, takes care of setting current Observation to the registry, but when a thread, managed by parallelStream/ForkJoinPool, is about to execute RestClient, this thread does not have an access to ThreadLocal of Tomcat thread, and therefore, to initial (parent) Observation.

    This is a well-known Context Propagation problem.

    Micrometer framework anticipated this kind of issues and provisioned special library for it: Context Propagation, its purpose is to

    ...assists with context propagation across different types of context mechanisms, such as ThreadLocal ...

    This library provides a special kind of ExecutorService, ContextExecutorService, which allows to

    propagate context to the task when it is executed.

    ContextExecutorService wraps other Executor in its static wrap method, let's use as delegatee same ForkJoinPool as used in parellelStream:

    @Configuration
    public class PropagationConfiguration {
    
        @Bean
        public ExecutorService propagationExecutorService() {
            return ContextExecutorService.wrap(Executors.newWorkStealingPool(), 
                (Supplier<ContextSnapshot>) () -> ContextSnapshotFactory.builder().build().captureAll());
        }
    }
    

    Then the instance of this ExecutorService could be injected into a class that issues the requests, for example, a Service:

    @Autowired
    private ExecutorService contextExecutorService;
    

    and then use this ExecutorService to submit requests in parallel.

    Let's now apply this approach to the OP's snippet, staying in the same streams paradigm:

        List<String> categories = ...
    
        return categories.stream().map( (category) -> {
                return contextExecutorService.submit( () -> {
                    return new CategoryDetails(category, restClient.post()
                            .uri(uriBuilder -> uriBuilder.path("...").build(category))
                            .retrieve()
                            .body(new ParameterizedTypeReference<List<Product>>() {}), null);
                });
            })
            .collect(Collectors.toList())
            .stream()
            .map( (future) -> {
                try {
                    return future.get();
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            })
            .collect(Collectors.toList());
    

    Note that first collect and subsequent stream() are necessary to allow parallel execution of CategoryDetails retrieval.

    Obviously, this is less elegant than parellelStream approach; it seems, however, that there is no reliable way to use custom Executor for parellelStreams. This approach, however, correctly propagates Observation Context to ExecutorService-managed threads and further to the server that handles the requests. It might also be preferable to parellelStreams because according to Parallel streams in Java: Benchmarking and performance considerations blog they

    might not provide significant benefits for the I/O operations

    It still might be possible to use the OP's original parallelStream approach, but this solution, which involves "manual" propagation of Observation Scope to the parallelStream-controlled threads via MDC Context, is not recommended as less scalable and MDC-coupled:

    @Autowired
    private ObservationRegistry registry;
    
    ... 
    
    final Scope currentScope = registry.getCurrentObservationScope();
    final Map<String, String> mdcContext = MDC.getCopyOfContextMap();   
    categories.parallelStream().map(category -> {
        final Scope oldScope = observationRegistry.getCurrentObservationScope();
        try {
            MDC.setContextMap(mdcContext);            
            registry.setCurrentObservationScope(currentScope);
            restClient.post().uri(uriBuilder -> uriBuilder.path("...")
            .build(category))
            .retrieve()
            ...         
        } finally {
            MDC.clear();
            registry.setCurrentObservationScope(oldScope);
        }
    }