I'm testing Spring Boot 3 with spring-kafka. I would like to be notified (similar to a callback) when the sent message has been processed (when the acknowledgment is executed in the consumer). Currently, the message 'Message sent successfully' is being printed without waiting for the message ACK. Here's the code I'm using:
MessageController:
@RestController
@RequestMapping("/messages")
@RequiredArgsConstructor
@Slf4j
public class MessageController {
private final KafkaTemplate<Object, String> kafkaTemplate;
@GetMapping
public void sendMessage() {
CompletableFuture<SendResult<Object, String>> future = kafkaTemplate.send("message-topic", "message");
future.whenComplete((result, ex) -> {
if (ex == null) {
log.info("Message sent successfully {}", result);
} else {
log.error("Error sending message", ex);
}
});
}
@KafkaListener(id = "message-topic-listener", topics = "message-topic")
public void messageListener(@Payload String message, Acknowledgment acknowledgment) throws InterruptedException {
Thread.sleep(10000L);
log.info("Processing message {}", message);
acknowledgment.acknowledge();
}
}
application.yml
spring:
kafka:
producer:
bootstrap-servers: localhost:29092
listener:
ack-mode: MANUAL
consumer:
enable-auto-commit: false
auto-offset-reset: earliest
bootstrap-servers: localhost:29092
Believe I have found the answer to my own question above, the 'whenComplete' method of the 'SendResult' class is executed when the message is successfully written/replicated to the partition, not on the consumer's commit event.