I'm looking for a way how to distribute messages between two Kafka topics. In original topic I have 20 partitions with 1000000 messages per partition. I want to have a new topic with 1000 partitions and spread messages across new wider partition range.
1T -> 20P -> 1000000 messages per partition (total 20m/topic)
2T -> 1000P -> 20000 messages per partition (total 20m/topic)
Is it possible to do that in Kafka (via topic mirroring or some other technique)?
You could use MirrorMaker (version 1) that comes with Kafka. This tool is mainly used to replicate data from one data center to another. It is build on the assumption that the topic names stay the same in both clusters.
However, you can provide your customised MessageHandler
that renames a topic.
package org.xxx.java;
import java.util.Collections;
import java.util.List;
import kafka.consumer.BaseConsumerRecord;
import kafka.tools.MirrorMaker;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* An example implementation of MirrorMakerMessageHandler that allows to rename topic.
*/
public class TopicRenameHandler implements MirrorMaker.MirrorMakerMessageHandler {
private final String newName;
public TopicRenameHandler(String newName) {
this.newName = newName;
}
public List<ProducerRecord<byte[], byte[]>> handle(BaseConsumerRecord record) {
return Collections.singletonList(new ProducerRecord<byte[], byte[]>(newName, record.partition(), record.key(), record.value()));
}
}
I used the following dependencies in my pom.xml
file
<properties>
<kafka.version>2.5.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
Compile the code above and make sure to add your class into the CLASSPATH
export CLASSPATH=$CLASSPATH:/.../target/MirrorMakerRenameTopics-1.0.jar
Now, together with some basic consumer.properties
bootstrap.servers=localhost:9092
client.id=mirror-maker-consumer
group.id=mirror-maker-rename-topic
auto.offset.reset=earliest
and producer.properties
bootstrap.servers=localhost:9092
client.id=mirror-maker-producer
you can call the kafka-mirror-maker
as below
kafka-mirror-maker --consumer.config /path/to/consumer.properties \
--producer.config /path/to/producer.properties \
--num.streams 1 \
--whitelist="topicToBeRenamed" \
--message.handler org.xxx.java.TopicRenameHandler \
--message.handler.args "newTopicName"
Please note the following two caveats with this approach: