I configured my application to load beans based on properties in this way:
@Bean
@ConditionalOnProperty(
prefix = "fleexi.kafka.override-concurrency",
value = {"enabled"}
)
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryWithConcurrency(ConsumerFactory<String, String> kafkaConsumerFactory, FleexiKafkaProperties fleexiKafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConcurrency(fleexiKafkaProperties.getOverrideConcurrency().getListenerConcurrency());
factory.getContainerProperties().setObservationEnabled(true);
factory.setConsumerFactory(kafkaConsumerFactory);
return factory;
}
@Bean
@ConditionalOnMissingBean({ConcurrentKafkaListenerContainerFactory.class})
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.getContainerProperties().setObservationEnabled(true);
factory.setConsumerFactory(kafkaConsumerFactory);
return factory;
}
In the first case, with concurrency enabled observabilities log traceId and spanId doesn't work.
In second case, without concurrency it works properly
Concurrency breaks observability?
EDIT: The example here: https://github.com/claudiomerli/demo-issue-spring-kafka-3459
Your problem is here:
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryWithConcurrency(
where the default containerFactory
for the @KafkaListener
is indeed that kafkaListenerContainerFactory
.
So, since your custom kafkaListenerContainerFactoryWithConcurrency
is out of use, that is not a surprise that traces are not logged.
Try just this configuration properties instead of those beans:
spring.kafka.consumer.group-id=TEST
spring.kafka.listener.concurrency=10
spring.kafka.listener.observation-enabled=true
UPDATE
Since you cannot rely on the auto-configuration and have to use those conditional beans you can trick it like this:
@Bean("kafkaListenerContainerFactory")
@ConditionalOnProperty(
prefix = "fleexi.kafka.override-concurrency",
value = {"enabled"}
)
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryWithConcurrency(ConsumerFactory<String, String> kafkaConsumerFactory, FleexiKafkaProperties fleexiKafkaProperties) {
With that default factory bean name expected by the @KafkaListener
.