apache-kafkaapache-kafka-mirrormakerkafka-partition

How to distribute messages between Kafka topics with different configuration?


I'm looking for a way how to distribute messages between two Kafka topics. In original topic I have 20 partitions with 1000000 messages per partition. I want to have a new topic with 1000 partitions and spread messages across new wider partition range.

1T -> 20P -> 1000000 messages per partition (total 20m/topic)
2T -> 1000P -> 20000 messages per partition (total 20m/topic)

Is it possible to do that in Kafka (via topic mirroring or some other technique)?


Solution

  • You could use MirrorMaker (version 1) that comes with Kafka. This tool is mainly used to replicate data from one data center to another. It is build on the assumption that the topic names stay the same in both clusters.

    However, you can provide your customised MessageHandler that renames a topic.

    package org.xxx.java;
    
    import java.util.Collections;
    import java.util.List;
    import kafka.consumer.BaseConsumerRecord;
    import kafka.tools.MirrorMaker;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    
    /**
     * An example implementation of MirrorMakerMessageHandler that allows to rename topic.
     */
    public class TopicRenameHandler implements MirrorMaker.MirrorMakerMessageHandler {
      private final String newName;
    
      public TopicRenameHandler(String newName) {
        this.newName = newName;
      }
    
      public List<ProducerRecord<byte[], byte[]>> handle(BaseConsumerRecord record) {
        return Collections.singletonList(new ProducerRecord<byte[], byte[]>(newName, record.partition(), record.key(), record.value()));
      }
    }
    

    I used the following dependencies in my pom.xml file

        <properties>
            <kafka.version>2.5.0</kafka.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>${kafka.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.13</artifactId>
                <version>${kafka.version}</version>
            </dependency>
        </dependencies>
    

    Compile the code above and make sure to add your class into the CLASSPATH

    export CLASSPATH=$CLASSPATH:/.../target/MirrorMakerRenameTopics-1.0.jar
    

    Now, together with some basic consumer.properties

    bootstrap.servers=localhost:9092
    client.id=mirror-maker-consumer
    group.id=mirror-maker-rename-topic
    auto.offset.reset=earliest
    

    and producer.properties

    bootstrap.servers=localhost:9092
    client.id=mirror-maker-producer
    

    you can call the kafka-mirror-maker as below

    kafka-mirror-maker --consumer.config /path/to/consumer.properties \
     --producer.config /path/to/producer.properties \
     --num.streams 1 \
     --whitelist="topicToBeRenamed" \
     --message.handler org.xxx.java.TopicRenameHandler \
     --message.handler.args "newTopicName"
    

    Please note the following two caveats with this approach: