apache-kafkakcat

How copy some message from one kafka topic to another from bash?


pls help

We have 2 kafka topic. I want copy 10 message from beginning from topic1 to topic2.

I`m try do it with kafka-console-consumer and kafka-console-producer

First i save 10 message from topic1 to some directory:

for (( i=1; i<=10; i++ )); do bin/kafka-console-consumer.sh --bootstrap-server 1.1.2.3:9092 --group CONSUMER1 --topic TOPIC1 --max-messages 1 > /tmp/_topic/$i.msg; done;

then i try with kafka-console-producer send it to topic2:

for (( i=1; i<=10; i++ )); do  bin/kafka-console-producer.sh --broker-list 1.1.2.4:9092 --topic TOPIC2 < /tmp/_topic/$i.msg; done;

And i got error - my service Can't deserialize data. My question is:

  1. does my solution will work ?
  2. Why i can reciev this error ?
  3. What is best way to copy message from one topic to another once ?

UPD: How i`m solve this problem (thanks: Robin Moffatt): I using kafka-mirror and this jar : https://github.com/opencore/mirrormaker_topic_rename with this i can copy message from one topic kafka to another on one cluster


Solution

  • You can do this with kafkacat:

    kafkacat -b localhost:9092 -C -t source-topic -K: -e -o beginning -c10 | \
    kafkacat -b localhost:9092 -P -t target-topic -K: 
    

    Note that this won't work if you've got binary data (e.g. Avro). To properly do this use something like Replicator or MirrorMaker2 and a ByteArrayConverter.