springspring-webfluxopentracingspring-webclient

Using WebClient to propagate request headers received in a Spring Webflux applications to downstream services


I have two kinds of Webflux applications, annotation-based and route-based. These applications are called with a set of headers, some of which (Open Tracing) I need to propagate in downstream calls using WebClient.

If these were normal Spring WebMvc applications I would use a Filter to keep the selected headers in a ThreadLocal, access it in a RestTemplate interceptor to send them to subsequent services and clear the ThreadLocal.

What's the proper way to replicate this behaviour in WebFlux applications?


Solution

  • I solved it using Project Reactor's Context to store the headers in a WebFilter. Then they are gotten in the WebClient's ExchangeFilterFunction. Here's the whole solution:

    WebFilter

    class OpenTracingFilter(private val openTracingHeaders: Set<String>) : WebFilter {
    
        private val logger = LoggerFactory.getLogger(javaClass)
    
        override fun filter(exchange: ServerWebExchange, chain: WebFilterChain): Mono<Void> {
    
            return chain.filter(exchange)
                    .subscriberContext { ctx ->
                        var updatedContext = ctx
                        exchange.request.headers.forEach {
                            if (openTracingHeaders.contains(it.key.toLowerCase())) {
                                logger.debug("Found OpenTracing Header - key {} - value {}", it.key, it.value[0])
                                updatedContext = updatedContext.put(it.key, it.value[0])
                            }
                        }
                        updatedContext
                    }
        }
    }
    

    OpenTracingExchangeFilterFunction

    class OpenTracingExchangeFilterFunction(private val headers: Set<String>) : ExchangeFilterFunction {
    
        private val logger = LoggerFactory.getLogger(javaClass)
    
        override fun filter(request: ClientRequest, next: ExchangeFunction): Mono<ClientResponse> {
    
            logger.debug("OpenTracingExchangeFilterFunction - filter()")
            return OpenTracingClientResponseMono(request, next, headers)
        }
    }
    

    OpenTracingClientResponseMono

    class OpenTracingClientResponseMono(private val request: ClientRequest,
                                        private val next: ExchangeFunction,
                                        private val headersToPropagate: Set<String>) : Mono<ClientResponse>() {
    
        private val logger = LoggerFactory.getLogger(javaClass)
    
        override fun subscribe(subscriber: CoreSubscriber<in ClientResponse>) {
            val context = subscriber.currentContext()
    
            val requestBuilder = ClientRequest.from(request)
            requestBuilder.headers { httpHeaders ->
                headersToPropagate.forEach {
                    if(context.hasKey(it)) {
                        logger.debug("Propagating header key {} - value{}", it, context.get<String>(it))
                        httpHeaders[it] = context.get<String>(it)
                    }
                }
            }
            val mutatedRequest = requestBuilder.build()
            next.exchange(mutatedRequest).subscribe(subscriber)
        }
    
    
    }
    

    OpenTracingConfiguration

    @Configuration
    class OpenTracingConfiguration(private val openTracingConfigurationProperties: OpenTracingConfigurationProperties) {
    
        @Bean
        fun webClient(): WebClient {
            return WebClient.builder().filter(openTracingExchangeFilterFunction()).build()
        }
    
        @Bean
        fun openTracingFilter(): WebFilter {
            return OpenTracingFilter(openTracingConfigurationProperties.headers)
        }
    
        @Bean
        fun openTracingExchangeFilterFunction(): OpenTracingExchangeFilterFunction {
            return OpenTracingExchangeFilterFunction(openTracingConfigurationProperties.headers)
        }
    }
    

    OpenTracingConfigurationProperties

    @Configuration
    @ConfigurationProperties("opentracing")
    class OpenTracingConfigurationProperties {
    
        lateinit var headers: Set<String>
    
    }
    

    application.yml

    opentracing:
      headers:
        - x-request-id
        - x-b3-traceid
        - x-b3-spanid
        - x-b3-parentspanid
        - x-b3-sampled
        - x-b3-flags
        - x-ot-span-context