Issue is in consumer logs traceid and spanid not getting printed. I have 2 apps Producer and Consumer. Both in springboot 3.
Both are communicating via Kafka. Both have micrometer brave dependency. If i hit any rest endpoint in Consumer via any client i am able to get the trace and span in logs but not in case of kafka message consumption.
PRODUCER -->
From Producer i am able to send the trace and spanid via kafka
(in producer factory i have this flag on - kafkaTemplate.setObservationEnabled(true);
).
<dependency>
<artifactId>spring-boot-starter-actuator</artifactId>
<groupId>org.springframework.boot</groupId>
</dependency>
<dependency>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
<groupId>io.micrometer</groupId>
</dependency>
KAFKA SERVER -->
using ./kafka-console-consumer.sh
i am able to see the header being sent to topic -
CONSUMER --> In consumer logs i am not able to get the trace and span in the logs. In the consumer header i do have the trace and span though, Please see the simple logs for reference
This is listener -
@KafkaListener(containerFactory = "tracingKafkaConsumerFactory", topics = "tracingTopic3")
public void kafkaTracingListener(ConsumerRecords<String, Object> consumerRecords){
consumerRecords.forEach(consumerRecord ->{
try {
if(consumerRecord.headers()!=null) {
consumerRecord.headers().forEach(header -> {
LOGGER.info("Header key {}, Header value {}", header.key(), new String(header.value()));
});
}
TraceDTO traceDto = objectMapper.readValue((String) consumerRecord.value(), TraceDTO.class);
LOGGER.info("consumer record {}", consumerRecord);
} catch (Exception e) {
LOGGER.error("Exception :", e);
}
});
LOGGER.info("Is trace printed here? ");
}
2024-08-02T11:42:07.483+05:30 INFO 30481 --- [trace-demo] [ntainer#0-1-C-1] [ ] c.e.trace.demo.TracingDemoKafkaConsumer : Header key traceparent, Header value 00-66ac78b618581c0549d46cbed2dfa9dc-9675e35b152ee946-01
2024-08-02T11:42:07.484+05:30 INFO 30481 --- [trace-demo] [ntainer#0-1-C-1] [ ] c.e.trace.demo.TracingDemoKafkaConsumer : Header key __TypeId__, Header value com.srs.chargesessionmanagement.tracetest.TraceDTO
2024-08-02T11:42:07.537+05:30 INFO 30481 --- [trace-demo] [ntainer#0-1-C-1] [ ] c.e.trace.demo.TracingDemoKafkaConsumer : data TraceDTO(id=11, data=eleven)
2024-08-02T11:42:07.571+05:30 INFO 30481 --- [trace-demo] [ntainer#0-1-C-1] [ ] c.e.trace.demo.TracingDemoKafkaConsumer : consumer record ConsumerRecord(topic = tracingTopic3, partition = 1, leaderEpoch = 0, offset = 2, CreateTime = 1722579126973, serialized key size = 3, serialized value size = 25, headers = RecordHeaders(headers = [RecordHeader(key = traceparent, value = [48, 48, 45, 54, 54, 97, 99, 55, 56, 98, 54, 49, 56, 53, 56, 49, 99, 48, 53, 52, 57, 100, 52, 54, 99, 98, 101, 100, 50, 100, 102, 97, 57, 100, 99, 45, 57, 54, 55, 53, 101, 51, 53, 98, 49, 53, 50, 101, 101, 57, 52, 54, 45, 48, 49]), RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 115, 114, 115, 46, 99, 104, 97, 114, 103, 101, 115, 101, 115, 115, 105, 111, 110, 109, 97, 110, 97, 103, 101, 109, 101, 110, 116, 46, 116, 114, 97, 99, 101, 116, 101, 115, 116, 46, 84, 114, 97, 99, 101, 68, 84, 79])], isReadOnly = false), key = key, value = {"id":11,"data":"eleven"})
2024-08-02T11:42:07.571+05:30 INFO 30481 --- [trace-demo] [ntainer#0-1-C-1] [ ] c.e.trace.demo.TracingDemoKafkaConsumer : Is trace printed here?
Consumer config -
@EnableKafka
@Configuration
public class TraceDemoKafkaConsumerConfig {
@Bean
public Map<String, Object> tracingKafkaConsumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9094");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "tracing-group-3");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public DefaultKafkaConsumerFactory<String, Object> tracingKafkaConsumerDefaultFactory() {
return new DefaultKafkaConsumerFactory<>(tracingKafkaConsumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> tracingKafkaConsumerFactory(){
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(tracingKafkaConsumerDefaultFactory());
factory.setConcurrency(2);
factory.setBatchListener(true);
factory.getContainerProperties().setObservationEnabled(true);
factory.getContainerProperties().setSyncCommits(true);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.valueOf("BATCH"));
return factory;
}
}
application.yaml -
management:
tracing:
sampling:
probability: 1
So, was trying to deduce why this wasn't working. Suddenly on closely seeing the code, it makes sense not to show the trace-span in the logs.
basically in the approach mentioned in the question, i am polling a list of consumerrecords
public void kafkaTracingListener(ConsumerRecords<String, Object> consumerRecords){
Now each consumer record in the poll would have different trace-spanid, and i am executing each of them in a single thread, so the trace-span will not work here automatically.
I tried a separate approach and in that it was autimatically setting the trace-span in the logging context and were visible in the logs -
@KafkaListener(containerFactory = "tracingKafkaConsumerFactory2", topics = "tracingTopic3", groupId = "two")
public void kafkaTraceListener2(@Payload String data, @Header("traceparent") String trace){
log.info("data {}, trace {}", data, trace);
}
In the above scenario the log was showing the trace-spanId -
2024-08-02T17:19:10.233+05:30 INFO 71027 --- [trace-demo] [ntainer#0-1-C-1] [66acc7b53c5cd46e12694ac72b08f513-85b79a077e41c571] c.e.trace.demo.TracingDemoKafkaConsumer : data {"id":11,"data":"eleven"}, trace 00-66acc7b53c5cd46e12694ac72b08f513-60545375088bedb3-01