javaspring-bootspring-kafkamicrometer-tracing

Springboot Kafka Consumer Not printing TraceId-SpanId in Logs received from Topic Message


Issue is in consumer logs traceid and spanid not getting printed. I have 2 apps Producer and Consumer. Both in springboot 3.

Both are communicating via Kafka. Both have micrometer brave dependency. If i hit any rest endpoint in Consumer via any client i am able to get the trace and span in logs but not in case of kafka message consumption.

PRODUCER --> From Producer i am able to send the trace and spanid via kafka (in producer factory i have this flag on - kafkaTemplate.setObservationEnabled(true);).

<dependency>
            <artifactId>spring-boot-starter-actuator</artifactId>
            <groupId>org.springframework.boot</groupId>
</dependency>
<dependency>
            <artifactId>micrometer-tracing-bridge-brave</artifactId>
            <groupId>io.micrometer</groupId>
</dependency>

KAFKA SERVER --> using ./kafka-console-consumer.sh i am able to see the header being sent to topic - enter image description here

CONSUMER --> In consumer logs i am not able to get the trace and span in the logs. In the consumer header i do have the trace and span though, Please see the simple logs for reference

This is listener -

@KafkaListener(containerFactory = "tracingKafkaConsumerFactory", topics = "tracingTopic3")
    public void kafkaTracingListener(ConsumerRecords<String, Object> consumerRecords){
        consumerRecords.forEach(consumerRecord ->{
            try {
                if(consumerRecord.headers()!=null) {
                    consumerRecord.headers().forEach(header -> {
                        LOGGER.info("Header key {}, Header value {}", header.key(), new String(header.value()));
                    });
                }
                TraceDTO traceDto = objectMapper.readValue((String) consumerRecord.value(), TraceDTO.class);
                LOGGER.info("consumer record {}", consumerRecord);
            } catch (Exception e) {
                LOGGER.error("Exception :", e);
            }
        });
        LOGGER.info("Is trace printed here? ");
    }

2024-08-02T11:42:07.483+05:30  INFO 30481 --- [trace-demo] [ntainer#0-1-C-1] [               ] c.e.trace.demo.TracingDemoKafkaConsumer  : Header key traceparent, Header value 00-66ac78b618581c0549d46cbed2dfa9dc-9675e35b152ee946-01
2024-08-02T11:42:07.484+05:30  INFO 30481 --- [trace-demo] [ntainer#0-1-C-1] [               ] c.e.trace.demo.TracingDemoKafkaConsumer  : Header key __TypeId__, Header value com.srs.chargesessionmanagement.tracetest.TraceDTO
2024-08-02T11:42:07.537+05:30  INFO 30481 --- [trace-demo] [ntainer#0-1-C-1] [               ] c.e.trace.demo.TracingDemoKafkaConsumer  : data TraceDTO(id=11, data=eleven) 
2024-08-02T11:42:07.571+05:30  INFO 30481 --- [trace-demo] [ntainer#0-1-C-1] [               ] c.e.trace.demo.TracingDemoKafkaConsumer  : consumer record ConsumerRecord(topic = tracingTopic3, partition = 1, leaderEpoch = 0, offset = 2, CreateTime = 1722579126973, serialized key size = 3, serialized value size = 25, headers = RecordHeaders(headers = [RecordHeader(key = traceparent, value = [48, 48, 45, 54, 54, 97, 99, 55, 56, 98, 54, 49, 56, 53, 56, 49, 99, 48, 53, 52, 57, 100, 52, 54, 99, 98, 101, 100, 50, 100, 102, 97, 57, 100, 99, 45, 57, 54, 55, 53, 101, 51, 53, 98, 49, 53, 50, 101, 101, 57, 52, 54, 45, 48, 49]), RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 115, 114, 115, 46, 99, 104, 97, 114, 103, 101, 115, 101, 115, 115, 105, 111, 110, 109, 97, 110, 97, 103, 101, 109, 101, 110, 116, 46, 116, 114, 97, 99, 101, 116, 101, 115, 116, 46, 84, 114, 97, 99, 101, 68, 84, 79])], isReadOnly = false), key = key, value = {"id":11,"data":"eleven"})
2024-08-02T11:42:07.571+05:30  INFO 30481 --- [trace-demo] [ntainer#0-1-C-1] [               ] c.e.trace.demo.TracingDemoKafkaConsumer  : Is trace printed here? 

Consumer config -

@EnableKafka
@Configuration
public class TraceDemoKafkaConsumerConfig {

    @Bean
    public Map<String, Object> tracingKafkaConsumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "localhost:9094");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "tracing-group-3");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean
    public DefaultKafkaConsumerFactory<String, Object> tracingKafkaConsumerDefaultFactory() {
        return new DefaultKafkaConsumerFactory<>(tracingKafkaConsumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> tracingKafkaConsumerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(tracingKafkaConsumerDefaultFactory());
        factory.setConcurrency(2);
        factory.setBatchListener(true);
        factory.getContainerProperties().setObservationEnabled(true);
        factory.getContainerProperties().setSyncCommits(true);
        factory.getContainerProperties()
                .setAckMode(ContainerProperties.AckMode.valueOf("BATCH"));
        return factory;
    }
}

application.yaml -

management:
  tracing:
    sampling:
      probability: 1

Solution

  • So, was trying to deduce why this wasn't working. Suddenly on closely seeing the code, it makes sense not to show the trace-span in the logs.

    basically in the approach mentioned in the question, i am polling a list of consumerrecords

    public void kafkaTracingListener(ConsumerRecords<String, Object> consumerRecords){

    Now each consumer record in the poll would have different trace-spanid, and i am executing each of them in a single thread, so the trace-span will not work here automatically.

    I tried a separate approach and in that it was autimatically setting the trace-span in the logging context and were visible in the logs -

    @KafkaListener(containerFactory = "tracingKafkaConsumerFactory2", topics = "tracingTopic3", groupId = "two")
        public void kafkaTraceListener2(@Payload String data, @Header("traceparent") String trace){
            log.info("data {}, trace {}", data, trace);
        }
    

    In the above scenario the log was showing the trace-spanId -

    2024-08-02T17:19:10.233+05:30  INFO 71027 --- [trace-demo] [ntainer#0-1-C-1] [66acc7b53c5cd46e12694ac72b08f513-85b79a077e41c571] c.e.trace.demo.TracingDemoKafkaConsumer  : data {"id":11,"data":"eleven"}, trace 00-66acc7b53c5cd46e12694ac72b08f513-60545375088bedb3-01