javaspringspring-bootapache-kafka-streamsrequestcontext

Accessing StreamListener headers from RequestContext or similar


I have a service which calls a dozen other services. This reads from a Kafka topic using a @StreamListener in a controller class. For traceability purposes, the same headers(original request ID) from the Kafka message need to be forwarded to all the other services as well

Traditionally, with a @PostMapping("/path") or GetMapping, a request context is generated, and one can access the headers from anywhere using RequestContextHolder.currentRequestAttributes() and I would just pass the HttpHeaders object into a RequestEntity whenever I need to make an external call

However in a StreamListener, no request context is generated and trying to access the RequestContextHolder results in an exception

Here's an example of what I tried to do, which resulted in an exception:

public class Controller {
  @Autowired Service1 service1
  @Autowired Service2 service2

  @StreamListener("stream")
  public void processMessage(Model model) {
    service1.execute(model);
    service2.execute(model);
  }
}

public class Service {
  RestTemplate restTemplate;

  public void execute(Model model){
    // Do some stuff

    HttpHeaders httpHeaders = RequestContextHolder.currentRequestAttributes().someCodeToGetHttpHeaders();
    HttpEntity<Model> request = new HttpEntity(model, httpHeaders);
    restTemplate.exchange(url, HttpMethod.POST, request, String.class);
  }
}

My current workaround is to change the StreamListener to a PostMapping and have another PostMapping which calls that so a request context can be generated. Another option was to use a ThreadLocal but it seems just as janky

I'm aware of the @Headers MessageHeaders annotation to access the stream headers, however, this isn't accessible easily without passing the headers down to each and every service and would affect many unit tests

Ideally, I need a way to create my own request context (or whatever the proper terminology is) to have a place to store request scoped objects (the HttpHeader) or another thread safe way to have request headers passed down the stack without adding a request argument to service.execute


Solution

  • I've found a solution and am leaving it here for anyone else trying to achieve something similar

    If your goal is to forward a bunch of headers end-to-end through REST controllers and Stream listeners, you might want to consider using Spring Cloud Sleuth

    Add it to your project through your maven or gradle configuration:

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-sleuth</artifactId>
    </dependency>
    

    Specifically, in Spring Cloud Sleuth there is a feature to forward headers or "baggage" by setting the property spring.sleuth.propagation-keys in your application.properties. These key-value pairs are persisted through the entire trace, including any downstream http or stream calls which also implement the same propagation keys

    If these fields need to be accessed on a code level, you can get and set them using the ExtraFieldPropagation static functions:

    ExtraFieldPropagation.set("country-code", "FO"); // Set
    String countryCode = ExtraFieldPropagation.get("country-code"); // Get
    

    Note that the ExtraFieldPropagation setter cannot set a property not present in the defined spring.sleuth.propagation-keys so arbitrary keys won't be accepted

    You can read up on the documentation for more information