spring-bootapache-kafkaspring-kafka

Cannot resolve method 'addCallback' in 'CompletableFuture'


I have the following Kafka client which I want to migrate to Spring Boot 3:

@Service
public class KafkaProducer<K, V> {
  
  private final KafkaTemplate<K, V> kafkaTemplate;

  public KafkaProducer(KafkaTemplate<K, V> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
  }

  public void send(String topic, K key, V message, ListenableFutureCallback callback) {
    ListenableFuture<SendResult<K, V>> future = kafkaTemplate.send(topic, key, message);
    if (Objects.nonNull(callback)) {
      future.addCallback(callback);
    }
  }
}

I get an error:

Required type: ListenableFuture <SendResult<K, V>> 
Provided: CompletableFuture <SendResult<K, V>>

I changed the code to:

CompletableFuture<SendResult<K, V>> future = kafkaTemplate.send(topic, key, message);
    if (Objects.nonNull(callback)) {
      future.addCallback(callback);
    }

But now I get:

Cannot resolve method 'addCallback' in 'CompletableFuture'

Do you know what should be the proper way to migrate this code?


Solution

  • The JavaDoc of ListenableFutureCallback is clear:

    Deprecated.

    as of 6.0, in favor of CompletableFuture.whenComplete(BiConsumer)

    public void send(String topic, K key, V message, BiConsumer<SendResult<K, V>, Throwable> callback) {
        CompletableFuture<SendResult<K, V>> future = kafkaTemplate.send(topic, key, message);
        if (Objects.nonNull(callback)) {
            future.whenComplete(callback);
        }
    }
    

    It is a bit cumbersome, so I suggest defining this:

    public interface Callback<K, V> extends BiConsumer<SendResult<K, V>, Throwable> {
    }
    
    public void send(String topic, K key, V message, Callback<K, V> callback) {
    ...
    }