I have an application that reads from kafka using reactive kafka. Below is the code to read from kafka:
public Flux<String> readFromKafka() {
return kafkaConsumerTemplate
.receiveAutoAck()
.doOnNext(consumerRecord -> log.debug("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
.doOnNext(s-> processData(s))
.map(ConsumerRecord::value)
.doOnNext(c -> log.debug("successfully consumed c{}", c))
.doOnError(exception -> log.error("Error occurred while processing the message, attempting retry. Error message: {} {}", exception.getMessage(), exception))
.retryWhen(Retry.backoff(Integer.parseInt(retryAttempts), Duration.ofSeconds(Integer.parseInt(retryAttemptsDelay))).transientErrors(true))
.onErrorResume(exception -> {
log.error("Kafka read retries exhausted : {} {}",exception.getMessage(), exception.toString());
return Flux.empty();
});
}
I have a requirement wherein I need to pause this read at a scheduled time for a task to process something and resume after the task is complete. This is the code I have added
public void pauseKafkaRead() {
List<TopicPartition> list = kafkaConsumerTemplate.assignment().collectList().block();
for (TopicPartition partition: list){
log.info("Partition: " + partition.toString());
kafkaConsumerTemplate.pause(partition);
}
}
public void resumeKafkaRead() {
List<TopicPartition> list = kafkaConsumerTemplate.assignment().collectList().block();
for (TopicPartition partition: list) {
log.info("Partition: " + partition.toString());
kafkaConsumerTemplate.resume(partition);
}
}
I call the pauseKafkaRead() method when I start with the task and call resume once the task is complete. However, this doesn't seem to pause the read. The data is continued to be read from Kafka and processed. Would someone be able to help me understand what I am missing here?
Kafka on pause or resume is offering you a mono so you need to subscribe in that stream in order to make it execute. You can try like this
public void pauseKafkaRead() {
kafkaConsumerTemplate.assignment()
.flatMap(o -> kafkaConsumerTemplate.pause(o))
.subscribe();
}
public void resumeKafkaRead() {
kafkaConsumerTemplate.assignment()
.flatMap(o -> kafkaConsumerTemplate.resume(o))
.subscribe();
}