spring-bootspring-kafka

Is Spring Boot Kafka consumer asynchronous, and if not, how do we make it?


We have Spring Boot 2.7 + Kafka, and can consume messages thusly:

@KafkaListener(topics = "${kafka.topic.stuff}")
public void consume(@Payload String message) {
    log.info("in kafka consumer");
   // process event
}

However, we don't know if these are being process synchronously (so if we take too long processing one message, it will delay processing the next).

Usually, we can see if it's processing using new threads from the log output, but for some reason, this info is missing:

2025-01-06T17:40:00,143Z INFO  pool-2-thread-1 c.c.g.scheduler.SomemScheduler [correlationToken:ANP-06c003a1-d31c-42f6-9d2d-a9cdb5bfde96] => some event scheduler started..

2025-01-06T17:40:07,721Z INFO  org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 c.c.g.s.cdm.consumer.MyConsumer [] => in kafka consumer
2025-01-06T17:40:14,665Z INFO  org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 org.apache.kafka.clients.NetworkClient [] => [Consumer clientId=consumer-SWH-1, groupId=SWH] Node -1 disconnected.

Here we can see that lines logged in our code show the thread name, but ones logged inside the Kafka listener don't, so we can't see if it's creating new threads or not.

In our logback.xml we have:

<pattern>%date{"yyyy-MM-dd'T'HH:mm:ss,SSSXXX", UTC} %-5level %thread %logger{42} [%X{correlationTokenKV}] => %msg%n</pattern>

The question is:

  1. Is Kafka consumer aysnc?
  2. If not, how do we make it?
  3. Why is our standard log format not being used?
  4. How do we make Kafka use our log formatter, or at least output the thread?

We include Kafka like this:

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

Solution

  • Your assumptions and observations are correct. The @KafkaListener implementation is not async. The logic there is to process one partition in the same thread to preserve an order for record offsets.

    You can make that async if you use CompletableFuture<Void> as a return type of your @KafkaListener.

    See more info in docs: https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/async-returns.html