pythonapache-kafkaconfluent-kafka-python

manual offset commit do not work as expected


I'm running confluent_kafka with 2.0.2 and test the manual offset commit code. I set up a confluentinc/cp-kafka:7.3.0 with a test-topic and 1 partition. I wrote a test script to test manual offset commmit with 'enable.auto.offset.store': False and 'enable.auto.commit': False, but the consumer seems to commit the offset after every poll().

The test code is as follows:

from confluent_kafka import Consumer, TopicPartition


LOCAL_TEST_TOPIC = "test-topic2"
LOCAL_TEST_PRODUCER_CONFIG = {
    'bootstrap.servers': '0.0.0.0:9092',
    'group.id': "tail-grouptest",
    'enable.auto.offset.store': False,
    'enable.auto.commit': False,
}

consumer = Consumer(LOCAL_TEST_PRODUCER_CONFIG)
topics = [LOCAL_TEST_TOPIC, ]


def msg_process(msg):
    if not msg:
        print("EMPTY MESSAGE")
        return
    print(msg.value().decode('utf-8'))


def basic_consume_loop(consumer, topics):
    try:
        consumer.subscribe(topics)
        topic = consumer.list_topics(topic='test-topic2')
        partitions = [TopicPartition('test-topic2', partition)
                      for partition in list(topic.topics['test-topic2'].partitions.keys())]

        # msg1
        msg1 = consumer.poll()
        msg_process(msg1)
        consumer.store_offsets(message=msg1)
        consumer.commit(asynchronous=False)
        print(consumer.position(partitions))

        # msg2
        msg2 = consumer.poll()
        msg_process(msg2)
        consumer.store_offsets(message=msg2)
        consumer.commit(asynchronous=False)
        print(consumer.position(partitions))

        # msg3
        msg3 = consumer.poll()
        msg_process(msg3)

        # msg4
        msg4 = consumer.poll()
        msg_process(msg4)

        # msg5
        msg5 = consumer.poll()
        msg_process(msg5)
        print(consumer.position(partitions))

        # msg6
        msg6 = consumer.poll()
        msg_process(msg6)
        print(consumer.position(partitions))

        print("set back to msg3")
        # back to msg3
        consumer.store_offsets(message=msg3)
        print("commit again")
        consumer.commit(message=msg3, asynchronous=False)
        print(consumer.position(partitions))

        msg_new = consumer.poll()
        msg_process(msg_new)

    finally:
        # Close down consumer to commit final offsets.
        consumer.close()


def main():
    basic_consume_loop(consumer, topics)


if __name__ == '__main__':
    main()

If I run console producer with message

1
2
3
4
5
6
7

The script gives the following result:

1
[TopicPartition{topic=test-topic2,partition=0,offset=1,error=None}]
2
[TopicPartition{topic=test-topic2,partition=0,offset=2,error=None}]
3
4
5
[TopicPartition{topic=test-topic2,partition=0,offset=5,error=None}]
6
[TopicPartition{topic=test-topic2,partition=0,offset=6,error=None}]
set back to msg3
commit again
[TopicPartition{topic=test-topic2,partition=0,offset=6,error=None}]
7

I expect msg3 - msg6 are the same message(msg3), but it seems that the offset keeps adding up and the consumer continues to poll new messages. How did that happen? Anything wrong in my code?


Solution

  • Committing only stores the offset in the __consumer_offsets topic.

    You are still required to seek if you want to re-consume old messages.