We have Spring Boot 2.7 + Kafka, and can consume messages thusly:
@KafkaListener(topics = "${kafka.topic.stuff}")
public void consume(@Payload String message) {
log.info("in kafka consumer");
// process event
}
However, we don't know if these are being process synchronously (so if we take too long processing one message, it will delay processing the next).
Usually, we can see if it's processing using new threads from the log output, but for some reason, this info is missing:
2025-01-06T17:40:00,143Z INFO pool-2-thread-1 c.c.g.scheduler.SomemScheduler [correlationToken:ANP-06c003a1-d31c-42f6-9d2d-a9cdb5bfde96] => some event scheduler started..
2025-01-06T17:40:07,721Z INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 c.c.g.s.cdm.consumer.MyConsumer [] => in kafka consumer
2025-01-06T17:40:14,665Z INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 org.apache.kafka.clients.NetworkClient [] => [Consumer clientId=consumer-SWH-1, groupId=SWH] Node -1 disconnected.
Here we can see that lines logged in our code show the thread name, but ones logged inside the Kafka listener don't, so we can't see if it's creating new threads or not.
In our logback.xml we have:
<pattern>%date{"yyyy-MM-dd'T'HH:mm:ss,SSSXXX", UTC} %-5level %thread %logger{42} [%X{correlationTokenKV}] => %msg%n</pattern>
The question is:
We include Kafka like this:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Your assumptions and observations are correct. The @KafkaListener
implementation is not async. The logic there is to process one partition in the same thread to preserve an order for record offsets.
You can make that async if you use CompletableFuture<Void>
as a return type of your @KafkaListener
.
See more info in docs: https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/async-returns.html