I have a reactive kafka application that reads data from a topic and writes to another topic. The topic has multiple partitions and I want to create the same number of consumers(in the same consumer group) as the partitions in the topic. From what I understand from this thread .receive() will create only one instance of KafkaReceiver that will read from all the partitions in the topic. So I would need multiple receivers to read from different partitions in parallel.
To do that I came up with the following code:
@Bean
public ReceiverOptions<String, String> kafkaReceiverOptions(String topic, KafkaProperties kafkaProperties) {
ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
return basicReceiverOptions.subscription(Collections.singletonList(topic))
.addAssignListener(receiverPartitions -> log.debug("onPartitionAssigned {}", receiverPartitions))
.addRevokeListener(receiverPartitions -> log.debug("onPartitionsRevoked {}", receiverPartitions));
}
@Bean
public ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) {
return new ReactiveKafkaConsumerTemplate<String, String>(kafkaReceiverOptions);
}
@Bean
public ReactiveKafkaProducerTemplate<String, List<Object>> kafkaProducerTemplate(
KafkaProperties properties) {
Map<String, Object> props = properties.buildProducerProperties();
return new ReactiveKafkaProducerTemplate<String, List<Object>>(SenderOptions.create(props));
}
public void run(String... args) {
for(int i = 0; i < topicPartitionsCount ; i++) {
readWrite(destinationTopic).subscribe();
}
}}
public Flux<String> readWrite(String destTopic) {
return kafkaConsumerTemplate
.receiveAutoAck()
.doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
.doOnNext(consumerRecord -> log.info("Record received from partition {} in thread {}", consumerRecord.partition(),Thread.currentThread().getName()))
.doOnNext(s-> sendToKafka(s,destTopic))
.map(ConsumerRecord::value)
.onErrorContinue((exception,errorConsumer)->{
log.error("Error while consuming : {}", exception.getMessage());
});
}
public void sendToKafka(ConsumerRecord<String, String> consumerRecord, String destTopic){
kafkaProducerTemplate.send(destTopic, consumerRecord.key(), transformRecord(consumerRecord))
.doOnNext(senderResult -> log.info("Record received from partition {} in thread {}", consumerRecord.partition(),Thread.currentThread().getName()))
.doOnSuccess(senderResult -> {
log.debug("Sent {} offset : {}", metrics, senderResult.recordMetadata().offset());
}
.doOnError(exception -> {
log.error("Error while sending message to destination topic : {}", exception.getMessage());
})
.subscribe();
}
When I tested this it seems to be working correctly, multiple Kafka Receiver instances are created that process the partitions in parallel. My question is, is this the most efficient way to create multiple instances? Is there another way in reactive Kafka to do this?
What you have done is correct.