apache-kafkaapache-kafka-connectapache-kafka-mirrormaker

MM2.0 consumer group behavior


I'm trying to run some tests to understand MM2 behavior. As part of that I had the following questions:

  1. How to correctly pass a custom consumer group for MM2 in mm2.properties?
    Based on this question, tried passing <alias>.group.id=temp_cons_group in mm2.properties and on restarting the MM2 instance could see the consumer group mentioned in the MM2 logs.
    However, when I try listing consumer groups registered in the source broker, the group doesn't show up?

  2. How to test if the property <alias>.consumer.auto.offset.reset works?
    Here, I want to consume the same messages again so in reference to the question, tried setting <source_alias>.consumer.auto.offset.reset to earliest and restarted MM2.
    I was able to see the property set correctly in MM2 logs but did not get the messages from the beginning in the target cluster topic.

  3. How do I start a MM2 instance to start consuming messages from a specific offset for a topic present in the source cluster?


Solution

    1. MirrorMaker does not use a consumer group to run and instead uses the assign() API, so it's expected that you don't see a group.

    2. It's hard to "test". One way to verify this configuration was picked up is to check it's present in the logs when MirrorMaker starts its consumers.

    3. This is currently not trivial to do. There's a KIP in progress to improve the process but at the moment it requires manually updating the internal offset topic from your Connect instance. At a very high level, here's the process:

      First, ensure MirrorMaker is not running. Then you need to find the offset records for MirrorMaker in the offsets topic using a command like:

      ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
        --topic <CONNECT_OFFSET_TOPIC \
        --from-beginning \
        --property print.key=true | grep <SOURCE_CONNECTOR_NAME>
      

      You will see records with offsets for each partition MirrorMaker handles. To update the offsets, you need to produce new records to this topic with the offsets you want. For each partition, ensure your record has the same key as the existing message so it replaces the existing stored offsets.