I have the following Kafka client which I want to migrate to Spring Boot 3:
@Service
public class KafkaProducer<K, V> {
private final KafkaTemplate<K, V> kafkaTemplate;
public KafkaProducer(KafkaTemplate<K, V> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void send(String topic, K key, V message, ListenableFutureCallback callback) {
ListenableFuture<SendResult<K, V>> future = kafkaTemplate.send(topic, key, message);
if (Objects.nonNull(callback)) {
future.addCallback(callback);
}
}
}
I get an error:
Required type: ListenableFuture <SendResult<K, V>>
Provided: CompletableFuture <SendResult<K, V>>
I changed the code to:
CompletableFuture<SendResult<K, V>> future = kafkaTemplate.send(topic, key, message);
if (Objects.nonNull(callback)) {
future.addCallback(callback);
}
But now I get:
Cannot resolve method 'addCallback' in 'CompletableFuture'
Do you know what should be the proper way to migrate this code?
The JavaDoc of ListenableFutureCallback
is clear:
Deprecated.
as of 6.0, in favor of
CompletableFuture.whenComplete(BiConsumer)
public void send(String topic, K key, V message, BiConsumer<SendResult<K, V>, Throwable> callback) {
CompletableFuture<SendResult<K, V>> future = kafkaTemplate.send(topic, key, message);
if (Objects.nonNull(callback)) {
future.whenComplete(callback);
}
}
It is a bit cumbersome, so I suggest defining this:
public interface Callback<K, V> extends BiConsumer<SendResult<K, V>, Throwable> {
}
public void send(String topic, K key, V message, Callback<K, V> callback) {
...
}